From 1c7c475c4341ecd146cce7c166f32bb5539cc6ab Mon Sep 17 00:00:00 2001 From: Harry Date: Tue, 6 Jan 2026 19:30:38 +0800 Subject: [PATCH] feat: add Command node support - Introduced Command node type in workflow with associated UI components and translations. - Enhanced SandboxLayer to manage sandbox attachment for Command nodes during execution. - Updated various components and constants to integrate Command node functionality across the workflow. --- api/core/app/layers/sandbox_layer.py | 23 ++ api/core/workflow/enums.py | 1 + api/core/workflow/nodes/command/__init__.py | 3 + api/core/workflow/nodes/command/entities.py | 10 + api/core/workflow/nodes/command/exc.py | 16 ++ api/core/workflow/nodes/command/node.py | 270 ++++++++++++++++++ .../core/workflow/nodes/command/__init__.py | 0 .../nodes/command/test_command_node.py | 156 ++++++++++ web/app/components/workflow/block-icon.tsx | 2 + .../workflow/block-selector/constants.tsx | 5 + web/app/components/workflow/constants/node.ts | 2 + .../workflow-panel/last-run/use-last-run.ts | 2 + .../workflow/nodes/command/default.ts | 35 +++ .../workflow/nodes/command/node.tsx | 13 + .../workflow/nodes/command/panel.tsx | 71 +++++ .../workflow/nodes/command/types.ts | 6 + .../workflow/nodes/command/use-config.ts | 33 +++ .../components/workflow/nodes/components.ts | 4 + web/app/components/workflow/types.ts | 1 + web/app/components/workflow/utils/workflow.ts | 1 + web/i18n/en-US/workflow.json | 9 + web/i18n/zh-Hans/workflow.json | 9 + 22 files changed, 672 insertions(+) create mode 100644 api/core/workflow/nodes/command/__init__.py create mode 100644 api/core/workflow/nodes/command/entities.py create mode 100644 api/core/workflow/nodes/command/exc.py create mode 100644 api/core/workflow/nodes/command/node.py create mode 100644 api/tests/unit_tests/core/workflow/nodes/command/__init__.py create mode 100644 api/tests/unit_tests/core/workflow/nodes/command/test_command_node.py create mode 100644 web/app/components/workflow/nodes/command/default.ts create mode 100644 web/app/components/workflow/nodes/command/node.tsx create mode 100644 web/app/components/workflow/nodes/command/panel.tsx create mode 100644 web/app/components/workflow/nodes/command/types.ts create mode 100644 web/app/components/workflow/nodes/command/use-config.ts diff --git a/api/core/app/layers/sandbox_layer.py b/api/core/app/layers/sandbox_layer.py index ab426f1c29..387ead414c 100644 --- a/api/core/app/layers/sandbox_layer.py +++ b/api/core/app/layers/sandbox_layer.py @@ -2,14 +2,17 @@ Sandbox Layer for managing VirtualEnvironment lifecycle during workflow execution. """ +import contextlib import logging from collections.abc import Mapping from typing import Any from core.virtual_environment.__base.virtual_environment import VirtualEnvironment from core.virtual_environment.factory import SandboxFactory, SandboxType +from core.workflow.enums import NodeType from core.workflow.graph_engine.layers.base import GraphEngineLayer from core.workflow.graph_events.base import GraphEngineEvent +from core.workflow.nodes.base.node import Node logger = logging.getLogger(__name__) @@ -107,6 +110,26 @@ class SandboxLayer(GraphEngineLayer): """ pass + def on_node_run_start(self, node: Node[Any]) -> None: + """Attach sandbox handle to CommandNode instances.""" + if node.node_type is not NodeType.COMMAND: + return + + try: + # FIXME: type: ignore[attr-defined] + node.sandbox = self.sandbox # type: ignore[attr-defined] + except Exception: + logger.exception("Failed to attach sandbox to node") + + def on_node_run_end(self, node: Node[Any], error: Exception | None) -> None: + _ = error + if node.node_type is not NodeType.COMMAND: + return + + with contextlib.suppress(Exception): + # FIXME: type: ignore[attr-defined] + node.sandbox = None # type: ignore[attr-defined] + def on_graph_end(self, error: Exception | None) -> None: """ Release the sandbox when workflow execution ends. diff --git a/api/core/workflow/enums.py b/api/core/workflow/enums.py index c08b62a253..94ec89ad39 100644 --- a/api/core/workflow/enums.py +++ b/api/core/workflow/enums.py @@ -63,6 +63,7 @@ class NodeType(StrEnum): TRIGGER_SCHEDULE = "trigger-schedule" TRIGGER_PLUGIN = "trigger-plugin" HUMAN_INPUT = "human-input" + COMMAND = "command" @property def is_trigger_node(self) -> bool: diff --git a/api/core/workflow/nodes/command/__init__.py b/api/core/workflow/nodes/command/__init__.py new file mode 100644 index 0000000000..78ca8ca06d --- /dev/null +++ b/api/core/workflow/nodes/command/__init__.py @@ -0,0 +1,3 @@ +from .node import CommandNode + +__all__ = ["CommandNode"] diff --git a/api/core/workflow/nodes/command/entities.py b/api/core/workflow/nodes/command/entities.py new file mode 100644 index 0000000000..8a4f5f8b05 --- /dev/null +++ b/api/core/workflow/nodes/command/entities.py @@ -0,0 +1,10 @@ +from core.workflow.nodes.base import BaseNodeData + + +class CommandNodeData(BaseNodeData): + """ + Command Node Data. + """ + + working_directory: str = "" # Working directory for command execution + command: str = "" # Command to execute diff --git a/api/core/workflow/nodes/command/exc.py b/api/core/workflow/nodes/command/exc.py new file mode 100644 index 0000000000..c6349d5630 --- /dev/null +++ b/api/core/workflow/nodes/command/exc.py @@ -0,0 +1,16 @@ +class CommandNodeError(ValueError): + """Base class for command node errors.""" + + pass + + +class CommandExecutionError(CommandNodeError): + """Raised when command execution fails.""" + + pass + + +class CommandTimeoutError(CommandNodeError): + """Raised when command execution times out.""" + + pass diff --git a/api/core/workflow/nodes/command/node.py b/api/core/workflow/nodes/command/node.py new file mode 100644 index 0000000000..64f5f83d91 --- /dev/null +++ b/api/core/workflow/nodes/command/node.py @@ -0,0 +1,270 @@ +import contextlib +import logging +import shlex +import threading +import time +from collections.abc import Mapping, Sequence +from typing import Any + +from core.virtual_environment.__base.exec import NotSupportedOperationError +from core.virtual_environment.__base.virtual_environment import VirtualEnvironment +from core.virtual_environment.channel.exec import TransportEOFError +from core.virtual_environment.channel.transport import TransportReadCloser +from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus +from core.workflow.node_events import NodeRunResult +from core.workflow.nodes.base import variable_template_parser +from core.workflow.nodes.base.entities import VariableSelector +from core.workflow.nodes.base.node import Node +from core.workflow.nodes.base.variable_template_parser import VariableTemplateParser +from core.workflow.nodes.command.entities import CommandNodeData +from core.workflow.nodes.command.exc import CommandExecutionError, CommandTimeoutError + +logger = logging.getLogger(__name__) + +COMMAND_NODE_TIMEOUT_SECONDS = 60 + + +def _drain_transport(transport: TransportReadCloser, buffer: bytearray) -> None: + try: + while True: + buffer.extend(transport.read(4096)) + except TransportEOFError: + pass + except Exception: + logger.exception("Failed reading transport") + finally: + with contextlib.suppress(Exception): + transport.close() + + +class CommandNode(Node[CommandNodeData]): + """Command Node - execute shell commands in a VirtualEnvironment.""" + + # FIXME: This is a temporary solution for sandbox injection from SandboxLayer. + # The sandbox is dynamically attached by SandboxLayer.on_node_run_start() before + # node execution and cleared by on_node_run_end(). A cleaner approach would be + # to pass sandbox through GraphRuntimeState or use a proper dependency injection pattern. + sandbox: VirtualEnvironment | None = None + + def _render_template(self, template: str) -> str: + parser = VariableTemplateParser(template=template) + selectors = parser.extract_variable_selectors() + if not selectors: + return template + + inputs: dict[str, Any] = {} + for selector in selectors: + value = self.graph_runtime_state.variable_pool.get(selector.value_selector) + inputs[selector.variable] = value.to_object() if value is not None else None + + return parser.format(inputs) + + node_type = NodeType.COMMAND + + @classmethod + def get_default_config(cls, filters: Mapping[str, object] | None = None) -> Mapping[str, object]: + """Get default config of node.""" + return { + "type": "command", + "config": { + "working_directory": "", + "command": "", + }, + } + + @classmethod + def version(cls) -> str: + return "1" + + def _run(self) -> NodeRunResult: + if not isinstance(self.sandbox, VirtualEnvironment): + return NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + error="Sandbox not available for CommandNode.", + error_type="SandboxNotInitializedError", + ) + + working_directory = (self.node_data.working_directory or "").strip() + raw_command = (self.node_data.command or "").strip() + + working_directory = self._render_template(working_directory).strip() + raw_command = self._render_template(raw_command).strip() + + working_directory = working_directory or None + timeout_seconds = COMMAND_NODE_TIMEOUT_SECONDS + + if not raw_command: + return NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + error="Command is required.", + error_type="CommandNodeError", + ) + + shell_command = raw_command + if working_directory: + shell_command = f"cd {shlex.quote(working_directory)} && {raw_command}" + + command = ["sh", "-lc", shell_command] + + # 0 or negative means no timeout + deadline = None + if timeout_seconds > 0: + deadline = time.monotonic() + timeout_seconds + + connection_handle = self.sandbox.establish_connection() + + pid = "" + stdin_transport = None + stdout_transport = None + stderr_transport = None + threads: list[threading.Thread] = [] + stdout_buf = bytearray() + stderr_buf = bytearray() + + try: + pid, stdin_transport, stdout_transport, stderr_transport = self.sandbox.execute_command( + connection_handle, command + ) + + # This node currently does not support interactive stdin. + with contextlib.suppress(Exception): + stdin_transport.close() + + is_combined_stream = stdout_transport is stderr_transport + + stdout_thread = threading.Thread( + target=_drain_transport, + args=(stdout_transport, stdout_buf), + daemon=True, + ) + threads.append(stdout_thread) + stdout_thread.start() + + if not is_combined_stream: + stderr_thread = threading.Thread( + target=_drain_transport, + args=(stderr_transport, stderr_buf), + daemon=True, + ) + threads.append(stderr_thread) + stderr_thread.start() + + exit_code: int | None = None + + while True: + if deadline is not None and time.monotonic() > deadline: + raise CommandTimeoutError(f"Command timed out after {timeout_seconds}s") + + try: + status = self.sandbox.get_command_status(connection_handle, pid) + except NotSupportedOperationError: + break + + if status.status == status.Status.COMPLETED: + exit_code = status.exit_code + break + + time.sleep(0.1) + + # Ensure transports are fully drained. + def _join_all() -> bool: + for t in threads: + remaining = None + if deadline is not None: + remaining = max(0.0, deadline - time.monotonic()) + t.join(timeout=remaining) + if t.is_alive(): + return False + return True + + if not _join_all(): + raise CommandTimeoutError(f"Command output not drained within {timeout_seconds}s") + + stdout_text = stdout_buf.decode("utf-8", errors="replace") + stderr_text = "" if is_combined_stream else stderr_buf.decode("utf-8", errors="replace") + + outputs: dict[str, Any] = { + "stdout": stdout_text, + "stderr": stderr_text, + "exit_code": exit_code, + "pid": pid, + } + + if exit_code not in (None, 0): + return NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + outputs=outputs, + process_data={"command": command, "working_directory": working_directory}, + error=f"Command exited with code {exit_code}", + error_type=CommandExecutionError.__name__, + ) + + return NodeRunResult( + status=WorkflowNodeExecutionStatus.SUCCEEDED, + outputs=outputs, + process_data={"command": command, "working_directory": working_directory}, + ) + + except (CommandExecutionError, CommandTimeoutError) as e: + if isinstance(e, CommandTimeoutError) and stdout_transport is not None: + for transport in (stdout_transport, stderr_transport): + if transport is None: + continue + with contextlib.suppress(Exception): + transport.close() + + for t in threads: + t.join(timeout=0.2) + + return NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + outputs={ + "stdout": stdout_buf.decode("utf-8", errors="replace"), + "stderr": stderr_buf.decode("utf-8", errors="replace"), + "exit_code": None, + "pid": pid, + }, + process_data={"command": command, "working_directory": working_directory}, + error=str(e), + error_type=type(e).__name__, + ) + except Exception as e: + logger.exception("Command node %s failed", self.id) + return NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + outputs={ + "stdout": stdout_buf.decode("utf-8", errors="replace"), + "stderr": stderr_buf.decode("utf-8", errors="replace"), + "exit_code": None, + "pid": pid, + }, + process_data={"command": command, "working_directory": working_directory}, + error=str(e), + error_type=type(e).__name__, + ) + finally: + with contextlib.suppress(Exception): + self.sandbox.release_connection(connection_handle) + + @classmethod + def _extract_variable_selector_to_variable_mapping( + cls, + *, + graph_config: Mapping[str, Any], + node_id: str, + node_data: Mapping[str, Any], + ) -> Mapping[str, Sequence[str]]: + """Extract variable mappings from node data.""" + _ = graph_config # Explicitly mark as unused + + typed_node_data = CommandNodeData.model_validate(node_data) + + selectors: list[VariableSelector] = [] + selectors += list(variable_template_parser.extract_selectors_from_template(typed_node_data.command)) + selectors += list(variable_template_parser.extract_selectors_from_template(typed_node_data.working_directory)) + + mapping: dict[str, Sequence[str]] = {} + for selector in selectors: + mapping[node_id + "." + selector.variable] = selector.value_selector + + return mapping diff --git a/api/tests/unit_tests/core/workflow/nodes/command/__init__.py b/api/tests/unit_tests/core/workflow/nodes/command/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/tests/unit_tests/core/workflow/nodes/command/test_command_node.py b/api/tests/unit_tests/core/workflow/nodes/command/test_command_node.py new file mode 100644 index 0000000000..84dbf13c6b --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/command/test_command_node.py @@ -0,0 +1,156 @@ +import time +from io import BytesIO + +from core.virtual_environment.__base.entities import Arch, CommandStatus, ConnectionHandle, FileState, Metadata +from core.virtual_environment.__base.virtual_environment import VirtualEnvironment +from core.virtual_environment.channel.queue_transport import QueueTransportReadCloser +from core.virtual_environment.channel.transport import NopTransportWriteCloser +from core.workflow.entities import GraphInitParams +from core.workflow.enums import WorkflowNodeExecutionStatus +from core.workflow.nodes.command.node import CommandNode +from core.workflow.runtime import GraphRuntimeState, VariablePool +from core.workflow.system_variable import SystemVariable + + +class FakeSandbox(VirtualEnvironment): + def __init__( + self, + *, + stdout: bytes = b"", + stderr: bytes = b"", + statuses: list[CommandStatus] | None = None, + close_streams: bool = True, + ) -> None: + self._stdout_bytes = stdout + self._stderr_bytes = stderr + self._statuses = list(statuses or []) + self._close_streams = close_streams + self.last_execute_command: list[str] | None = None + self.released_connections: list[str] = [] + super().__init__(options={}, environments={}) + + def _construct_environment(self, options, environments): # type: ignore[override] + return Metadata(id="fake", arch=Arch.ARM64) + + def upload_file(self, path: str, content: BytesIO) -> None: + raise NotImplementedError + + def download_file(self, path: str) -> BytesIO: + raise NotImplementedError + + def list_files(self, directory_path: str, limit: int) -> list[FileState]: + return [] + + def establish_connection(self) -> ConnectionHandle: + return ConnectionHandle(id="conn") + + def release_connection(self, connection_handle: ConnectionHandle) -> None: + self.released_connections.append(connection_handle.id) + + def release_environment(self) -> None: + return + + def execute_command(self, connection_handle: ConnectionHandle, command: list[str], environments=None): # type: ignore[override] + _ = connection_handle + _ = environments + self.last_execute_command = command + + stdout = QueueTransportReadCloser() + stderr = QueueTransportReadCloser() + + if self._stdout_bytes: + stdout.get_write_handler().write(self._stdout_bytes) + if self._stderr_bytes: + stderr.get_write_handler().write(self._stderr_bytes) + + if self._close_streams: + stdout.close() + stderr.close() + + return "pid", NopTransportWriteCloser(), stdout, stderr + + def get_command_status(self, connection_handle: ConnectionHandle, pid: str) -> CommandStatus: + if self._statuses: + return self._statuses.pop(0) + return CommandStatus(status=CommandStatus.Status.COMPLETED, exit_code=0) + + +def _make_node(*, command: str, working_directory: str = "") -> CommandNode: + variable_pool = VariablePool(system_variables=SystemVariable.empty(), user_inputs={}) + runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()) + init_params = GraphInitParams( + tenant_id="t", + app_id="a", + workflow_id="w", + graph_config={}, + user_id="u", + user_from="account", + invoke_from="debugger", + call_depth=0, + ) + + return CommandNode( + id="node-instance", + config={ + "id": "node-config-id", + "data": { + "title": "Command", + "command": command, + "working_directory": working_directory, + }, + }, + graph_init_params=init_params, + graph_runtime_state=runtime_state, + ) + + +def test_command_node_success_executes_in_sandbox(): + node = _make_node(command="echo {{#pre_node_id.number#}}", working_directory="dir-{{#pre_node_id.number#}}") + node.graph_runtime_state.variable_pool.add(("pre_node_id", "number"), 42) + + sandbox = FakeSandbox(stdout=b"ok\n", stderr=b"") + node.sandbox = sandbox + + result = node._run() # pyright: ignore[reportPrivateUsage] + + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs["stdout"] == "ok\n" + assert result.outputs["stderr"] == "" + assert result.outputs["exit_code"] == 0 + + assert sandbox.last_execute_command is not None + assert sandbox.last_execute_command[:2] == ["sh", "-lc"] + assert "cd dir-42 && echo 42" in sandbox.last_execute_command[2] + + +def test_command_node_nonzero_exit_code_returns_failed_result(): + node = _make_node(command="false") + sandbox = FakeSandbox( + stdout=b"out", + stderr=b"err", + statuses=[CommandStatus(status=CommandStatus.Status.COMPLETED, exit_code=2)], + ) + node.sandbox = sandbox + + result = node._run() # pyright: ignore[reportPrivateUsage] + + assert result.status == WorkflowNodeExecutionStatus.FAILED + assert result.outputs["exit_code"] == 2 + assert "exited with code" in result.error + + +def test_command_node_timeout_returns_failed_result_and_closes_transports(): + node = _make_node(command="sleep 10") + sandbox = FakeSandbox( + stdout=b"", + stderr=b"", + statuses=[CommandStatus(status=CommandStatus.Status.RUNNING, exit_code=None)] * 100, + close_streams=False, + ) + node.sandbox = sandbox + + result = node._run() # pyright: ignore[reportPrivateUsage] + + assert result.status == WorkflowNodeExecutionStatus.FAILED + assert result.error_type == "CommandTimeoutError" + assert "timed out" in result.error diff --git a/web/app/components/workflow/block-icon.tsx b/web/app/components/workflow/block-icon.tsx index 32b06d475f..67ab961725 100644 --- a/web/app/components/workflow/block-icon.tsx +++ b/web/app/components/workflow/block-icon.tsx @@ -45,6 +45,7 @@ const DEFAULT_ICON_MAP: Record = { [BlockEnum.Start]: 'bg-util-colors-blue-brand-blue-brand-500', [BlockEnum.LLM]: 'bg-util-colors-indigo-indigo-500', [BlockEnum.Code]: 'bg-util-colors-blue-blue-500', + [BlockEnum.Command]: 'bg-util-colors-blue-blue-500', [BlockEnum.End]: 'bg-util-colors-warning-warning-500', [BlockEnum.IfElse]: 'bg-util-colors-cyan-cyan-500', [BlockEnum.Iteration]: 'bg-util-colors-cyan-cyan-500', diff --git a/web/app/components/workflow/block-selector/constants.tsx b/web/app/components/workflow/block-selector/constants.tsx index 5ecce2aa49..3c4270fe35 100644 --- a/web/app/components/workflow/block-selector/constants.tsx +++ b/web/app/components/workflow/block-selector/constants.tsx @@ -147,6 +147,11 @@ export const BLOCKS = [ type: BlockEnum.ListFilter, title: 'List Filter', }, + { + classification: BlockClassificationEnum.Utilities, + type: BlockEnum.Command, + title: 'Command', + }, { classification: BlockClassificationEnum.Default, type: BlockEnum.Agent, diff --git a/web/app/components/workflow/constants/node.ts b/web/app/components/workflow/constants/node.ts index 5de9512752..02107af638 100644 --- a/web/app/components/workflow/constants/node.ts +++ b/web/app/components/workflow/constants/node.ts @@ -1,6 +1,7 @@ import agentDefault from '@/app/components/workflow/nodes/agent/default' import assignerDefault from '@/app/components/workflow/nodes/assigner/default' import codeDefault from '@/app/components/workflow/nodes/code/default' +import commandDefault from '@/app/components/workflow/nodes/command/default' import documentExtractorDefault from '@/app/components/workflow/nodes/document-extractor/default' @@ -33,6 +34,7 @@ export const WORKFLOW_COMMON_NODES = [ loopStartDefault, loopEndDefault, codeDefault, + commandDefault, templateTransformDefault, variableAggregatorDefault, documentExtractorDefault, diff --git a/web/app/components/workflow/nodes/_base/components/workflow-panel/last-run/use-last-run.ts b/web/app/components/workflow/nodes/_base/components/workflow-panel/last-run/use-last-run.ts index 0de98db032..dafac33124 100644 --- a/web/app/components/workflow/nodes/_base/components/workflow-panel/last-run/use-last-run.ts +++ b/web/app/components/workflow/nodes/_base/components/workflow-panel/last-run/use-last-run.ts @@ -42,6 +42,7 @@ const singleRunFormParamsHooks: Record = { [BlockEnum.LLM]: useLLMSingleRunFormParams, [BlockEnum.KnowledgeRetrieval]: useKnowledgeRetrievalSingleRunFormParams, [BlockEnum.Code]: useCodeSingleRunFormParams, + [BlockEnum.Command]: undefined, [BlockEnum.TemplateTransform]: useTemplateTransformSingleRunFormParams, [BlockEnum.QuestionClassifier]: useQuestionClassifierSingleRunFormParams, [BlockEnum.HttpRequest]: useHttpRequestSingleRunFormParams, @@ -81,6 +82,7 @@ const getDataForCheckMoreHooks: Record = { [BlockEnum.LLM]: undefined, [BlockEnum.KnowledgeRetrieval]: undefined, [BlockEnum.Code]: undefined, + [BlockEnum.Command]: undefined, [BlockEnum.TemplateTransform]: undefined, [BlockEnum.QuestionClassifier]: undefined, [BlockEnum.HttpRequest]: undefined, diff --git a/web/app/components/workflow/nodes/command/default.ts b/web/app/components/workflow/nodes/command/default.ts new file mode 100644 index 0000000000..e80db6fa62 --- /dev/null +++ b/web/app/components/workflow/nodes/command/default.ts @@ -0,0 +1,35 @@ +import type { NodeDefault } from '../../types' +import type { CommandNodeType } from './types' +import { BlockClassificationEnum } from '@/app/components/workflow/block-selector/types' +import { BlockEnum } from '@/app/components/workflow/types' +import { genNodeMetaData } from '@/app/components/workflow/utils' + +const i18nPrefix = 'errorMsg' + +const metaData = genNodeMetaData({ + classification: BlockClassificationEnum.Utilities, + sort: 2, + type: BlockEnum.Command, +}) + +const nodeDefault: NodeDefault = { + metaData, + defaultValue: { + working_directory: '', + command: '', + }, + checkValid(payload: CommandNodeType, t: any) { + let errorMessages = '' + const { command } = payload + + if (!errorMessages && !command) + errorMessages = t(`${i18nPrefix}.fieldRequired`, { ns: 'workflow', field: t(`${i18nPrefix}.fields.command`, { ns: 'workflow' }) }) + + return { + isValid: !errorMessages, + errorMessage: errorMessages, + } + }, +} + +export default nodeDefault diff --git a/web/app/components/workflow/nodes/command/node.tsx b/web/app/components/workflow/nodes/command/node.tsx new file mode 100644 index 0000000000..dd4de19bfd --- /dev/null +++ b/web/app/components/workflow/nodes/command/node.tsx @@ -0,0 +1,13 @@ +import type { FC } from 'react' +import type { CommandNodeType } from './types' +import type { NodeProps } from '@/app/components/workflow/types' +import * as React from 'react' + +const Node: FC> = () => { + return ( + // No summary content - same as Code node +
+ ) +} + +export default React.memo(Node) diff --git a/web/app/components/workflow/nodes/command/panel.tsx b/web/app/components/workflow/nodes/command/panel.tsx new file mode 100644 index 0000000000..f70f3b9dfc --- /dev/null +++ b/web/app/components/workflow/nodes/command/panel.tsx @@ -0,0 +1,71 @@ +import type { FC } from 'react' +import type { CommandNodeType } from './types' +import type { NodePanelProps } from '@/app/components/workflow/types' +import * as React from 'react' +import { useTranslation } from 'react-i18next' +import Field from '@/app/components/workflow/nodes/_base/components/field' +import Input from '@/app/components/workflow/nodes/_base/components/input-support-select-var' +import Split from '@/app/components/workflow/nodes/_base/components/split' +import useAvailableVarList from '@/app/components/workflow/nodes/_base/hooks/use-available-var-list' +import useConfig from './use-config' + +const i18nPrefix = 'nodes.command' + +const Panel: FC> = ({ + id, + data, +}) => { + const { t } = useTranslation() + + const { + readOnly, + inputs, + handleWorkingDirectoryChange, + handleCommandChange, + } = useConfig(id, data) + + const { availableVars, availableNodesWithParent } = useAvailableVarList(id, { + onlyLeafNodeVar: false, + filterVar: () => true, + }) + + return ( +
+
+ + + + + + + +
+
+ ) +} + +export default React.memo(Panel) diff --git a/web/app/components/workflow/nodes/command/types.ts b/web/app/components/workflow/nodes/command/types.ts new file mode 100644 index 0000000000..af1ce5cc8b --- /dev/null +++ b/web/app/components/workflow/nodes/command/types.ts @@ -0,0 +1,6 @@ +import type { CommonNodeType } from '@/app/components/workflow/types' + +export type CommandNodeType = CommonNodeType & { + working_directory: string + command: string +} diff --git a/web/app/components/workflow/nodes/command/use-config.ts b/web/app/components/workflow/nodes/command/use-config.ts new file mode 100644 index 0000000000..e4212f0cc0 --- /dev/null +++ b/web/app/components/workflow/nodes/command/use-config.ts @@ -0,0 +1,33 @@ +import type { CommandNodeType } from './types' +import { produce } from 'immer' +import { useCallback } from 'react' +import { useNodesReadOnly } from '@/app/components/workflow/hooks' +import useNodeCrud from '@/app/components/workflow/nodes/_base/hooks/use-node-crud' + +const useConfig = (id: string, payload: CommandNodeType) => { + const { nodesReadOnly: readOnly } = useNodesReadOnly() + const { inputs, setInputs } = useNodeCrud(id, payload) + + const handleWorkingDirectoryChange = useCallback((value: string) => { + const newInputs = produce(inputs, (draft) => { + draft.working_directory = value + }) + setInputs(newInputs) + }, [inputs, setInputs]) + + const handleCommandChange = useCallback((value: string) => { + const newInputs = produce(inputs, (draft) => { + draft.command = value + }) + setInputs(newInputs) + }, [inputs, setInputs]) + + return { + readOnly, + inputs, + handleWorkingDirectoryChange, + handleCommandChange, + } +} + +export default useConfig diff --git a/web/app/components/workflow/nodes/components.ts b/web/app/components/workflow/nodes/components.ts index 87c0066d15..5a82a496d9 100644 --- a/web/app/components/workflow/nodes/components.ts +++ b/web/app/components/workflow/nodes/components.ts @@ -8,6 +8,8 @@ import AssignerNode from './assigner/node' import AssignerPanel from './assigner/panel' import CodeNode from './code/node' import CodePanel from './code/panel' +import CommandNode from './command/node' +import CommandPanel from './command/panel' import DataSourceNode from './data-source/node' import DataSourcePanel from './data-source/panel' import DocExtractorNode from './document-extractor/node' @@ -75,6 +77,7 @@ export const NodeComponentMap: Record> = { [BlockEnum.TriggerSchedule]: TriggerScheduleNode, [BlockEnum.TriggerWebhook]: TriggerWebhookNode, [BlockEnum.TriggerPlugin]: TriggerPluginNode, + [BlockEnum.Command]: CommandNode, } export const PanelComponentMap: Record> = { @@ -103,4 +106,5 @@ export const PanelComponentMap: Record> = { [BlockEnum.TriggerSchedule]: TriggerSchedulePanel, [BlockEnum.TriggerWebhook]: TriggerWebhookPanel, [BlockEnum.TriggerPlugin]: TriggerPluginPanel, + [BlockEnum.Command]: CommandPanel, } diff --git a/web/app/components/workflow/types.ts b/web/app/components/workflow/types.ts index 740f1c1113..be6bedca99 100644 --- a/web/app/components/workflow/types.ts +++ b/web/app/components/workflow/types.ts @@ -49,6 +49,7 @@ export enum BlockEnum { TriggerSchedule = 'trigger-schedule', TriggerWebhook = 'trigger-webhook', TriggerPlugin = 'trigger-plugin', + Command = 'command', } export enum ControlMode { diff --git a/web/app/components/workflow/utils/workflow.ts b/web/app/components/workflow/utils/workflow.ts index 7fabc51a45..ea92ba76e0 100644 --- a/web/app/components/workflow/utils/workflow.ts +++ b/web/app/components/workflow/utils/workflow.ts @@ -20,6 +20,7 @@ export const canRunBySingle = (nodeType: BlockEnum, isChildNode: boolean) => { return nodeType === BlockEnum.LLM || nodeType === BlockEnum.KnowledgeRetrieval || nodeType === BlockEnum.Code + || nodeType === BlockEnum.Command || nodeType === BlockEnum.TemplateTransform || nodeType === BlockEnum.QuestionClassifier || nodeType === BlockEnum.HttpRequest diff --git a/web/i18n/en-US/workflow.json b/web/i18n/en-US/workflow.json index 9dc21e79e6..8846039bbf 100644 --- a/web/i18n/en-US/workflow.json +++ b/web/i18n/en-US/workflow.json @@ -3,6 +3,7 @@ "blocks.answer": "Answer", "blocks.assigner": "Variable Assigner", "blocks.code": "Code", + "blocks.command": "Command", "blocks.datasource": "Data Source", "blocks.datasource-empty": "Empty Data Source", "blocks.document-extractor": "Doc Extractor", @@ -33,6 +34,7 @@ "blocksAbout.answer": "Define the reply content of a chat conversation", "blocksAbout.assigner": "The variable assignment node is used for assigning values to writable variables(like conversation variables).", "blocksAbout.code": "Execute a piece of Python or NodeJS code to implement custom logic", + "blocksAbout.command": "Execute shell commands in a subprocess", "blocksAbout.datasource": "Data Source About", "blocksAbout.datasource-empty": "Empty Data Source placeholder", "blocksAbout.document-extractor": "Used to parse uploaded documents into text content that is easily understandable by LLM.", @@ -296,6 +298,7 @@ "errorMsg.authRequired": "Authorization is required", "errorMsg.fieldRequired": "{{field}} is required", "errorMsg.fields.code": "Code", + "errorMsg.fields.command": "Command", "errorMsg.fields.model": "Model", "errorMsg.fields.rerankModel": "A configured Rerank Model", "errorMsg.fields.variable": "Variable Name", @@ -405,6 +408,12 @@ "nodes.code.outputVars": "Output Variables", "nodes.code.searchDependencies": "Search Dependencies", "nodes.code.syncFunctionSignature": "Sync function signature to code", + "nodes.command.command": "Command", + "nodes.command.commandPlaceholder": "Enter the command to execute, e.g., ls -la", + "nodes.command.seconds": "seconds", + "nodes.command.timeout": "Timeout", + "nodes.command.workingDirectory": "Working Directory", + "nodes.command.workingDirectoryPlaceholder": "Enter working directory path (optional)", "nodes.common.errorHandle.defaultValue.desc": "When an error occurs, specify a static output content.", "nodes.common.errorHandle.defaultValue.inLog": "Node exception, outputting according to default values.", "nodes.common.errorHandle.defaultValue.output": "Output Default Value", diff --git a/web/i18n/zh-Hans/workflow.json b/web/i18n/zh-Hans/workflow.json index 7787c9db4b..b246144712 100644 --- a/web/i18n/zh-Hans/workflow.json +++ b/web/i18n/zh-Hans/workflow.json @@ -3,6 +3,7 @@ "blocks.answer": "直接回复", "blocks.assigner": "变量赋值", "blocks.code": "代码执行", + "blocks.command": "命令执行", "blocks.datasource": "数据源", "blocks.datasource-empty": "空数据源", "blocks.document-extractor": "文档提取器", @@ -33,6 +34,7 @@ "blocksAbout.answer": "定义一个聊天对话的回复内容", "blocksAbout.assigner": "变量赋值节点用于向可写入变量(例如会话变量)进行变量赋值。", "blocksAbout.code": "执行一段 Python 或 NodeJS 代码实现自定义逻辑", + "blocksAbout.command": "在子进程中执行 shell 命令", "blocksAbout.datasource": "数据源节点", "blocksAbout.datasource-empty": "空数据源占位符", "blocksAbout.document-extractor": "用于将用户上传的文档解析为 LLM 便于理解的文本内容。", @@ -296,6 +298,7 @@ "errorMsg.authRequired": "请先授权", "errorMsg.fieldRequired": "{{field}} 不能为空", "errorMsg.fields.code": "代码", + "errorMsg.fields.command": "命令", "errorMsg.fields.model": "模型", "errorMsg.fields.rerankModel": "Rerank 模型", "errorMsg.fields.variable": "变量名", @@ -405,6 +408,12 @@ "nodes.code.outputVars": "输出变量", "nodes.code.searchDependencies": "搜索依赖", "nodes.code.syncFunctionSignature": "同步函数签名至代码", + "nodes.command.command": "命令", + "nodes.command.commandPlaceholder": "输入要执行的命令,例如 ls -la", + "nodes.command.seconds": "秒", + "nodes.command.timeout": "超时时间", + "nodes.command.workingDirectory": "工作目录", + "nodes.command.workingDirectoryPlaceholder": "输入工作目录路径(可选)", "nodes.common.errorHandle.defaultValue.desc": "当发生异常时,指定默认输出内容。", "nodes.common.errorHandle.defaultValue.inLog": "节点异常,根据默认值输出。", "nodes.common.errorHandle.defaultValue.output": "输出默认值",