dify/api/core/app/layers/sandbox_layer.py
2026-01-08 11:04:12 +08:00

183 lines
6.6 KiB
Python

"""
Sandbox Layer for managing VirtualEnvironment lifecycle during workflow execution.
"""
import contextlib
import logging
from collections.abc import Mapping
from typing import Any
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
from core.virtual_environment.factory import SandboxFactory, SandboxType
from core.workflow.enums import NodeType
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.graph_events.base import GraphEngineEvent
from core.workflow.nodes.base.node import Node
logger = logging.getLogger(__name__)
class SandboxInitializationError(Exception):
"""Raised when sandbox initialization fails."""
pass
class SandboxLayer(GraphEngineLayer):
"""
Manages VirtualEnvironment (sandbox) lifecycle during workflow execution.
Responsibilities:
- on_graph_start: Initialize the sandbox environment
- on_graph_end: Release the sandbox environment (cleanup)
Example:
# Using tenant-specific configuration (recommended):
layer = SandboxLayer(tenant_id="tenant-uuid")
# Using explicit configuration (for testing/override):
layer = SandboxLayer(
sandbox_type=SandboxType.DOCKER,
options={"docker_image": "python:3.11-slim"},
)
graph_engine.layer(layer)
# During workflow execution, access sandbox via:
# layer.sandbox.execute_command(...)
"""
def __init__(
self,
tenant_id: str | None = None,
sandbox_type: SandboxType | None = None,
options: Mapping[str, Any] | None = None,
environments: Mapping[str, str] | None = None,
) -> None:
"""
Initialize the SandboxLayer.
Args:
tenant_id: Tenant ID to load sandbox configuration from database.
If provided, sandbox_type and options are ignored and
loaded from the tenant's active sandbox provider.
sandbox_type: Type of sandbox to create (default: DOCKER).
Only used if tenant_id is not provided.
options: Sandbox-specific configuration options.
Only used if tenant_id is not provided.
environments: Environment variables to set in the sandbox.
"""
super().__init__()
self._tenant_id = tenant_id
self._sandbox_type = sandbox_type
self._options: Mapping[str, Any] = options or {}
self._environments: Mapping[str, str] = environments or {}
self._sandbox: VirtualEnvironment | None = None
@property
def sandbox(self) -> VirtualEnvironment:
"""
Get the current sandbox instance.
Returns:
The initialized VirtualEnvironment instance
Raises:
RuntimeError: If sandbox has not been initialized
"""
if self._sandbox is None:
raise RuntimeError("Sandbox not initialized. Ensure on_graph_start() has been called.")
return self._sandbox
def on_graph_start(self) -> None:
"""
Initialize the sandbox when workflow execution starts.
If tenant_id was provided, uses SandboxProviderService to create
the sandbox with the tenant's active provider configuration.
Otherwise, falls back to explicit sandbox_type/options.
Raises:
SandboxInitializationError: If sandbox cannot be created
"""
try:
if self._tenant_id:
# Use SandboxProviderService to create sandbox based on tenant config
from services.sandbox.sandbox_provider_service import SandboxProviderService
logger.info("Initializing sandbox for tenant_id=%s", self._tenant_id)
self._sandbox = SandboxProviderService.create_sandbox(
tenant_id=self._tenant_id,
environments=self._environments,
)
else:
# Fallback to explicit configuration (backward compatibility)
sandbox_type = self._sandbox_type or SandboxType.DOCKER
logger.info("Initializing sandbox, sandbox_type=%s", sandbox_type)
self._sandbox = SandboxFactory.create(
sandbox_type=sandbox_type,
options=self._options,
environments=self._environments,
)
logger.info(
"Sandbox initialized, sandbox_id=%s, sandbox_arch=%s",
self._sandbox.metadata.id,
self._sandbox.metadata.arch,
)
except Exception as e:
logger.exception("Failed to initialize sandbox")
raise SandboxInitializationError(f"Failed to initialize sandbox: {e}") from e
def on_event(self, event: GraphEngineEvent) -> None:
"""
Handle graph engine events.
Currently a no-op, but can be extended for sandbox monitoring/health checks.
"""
pass
def on_node_run_start(self, node: Node[Any]) -> None:
"""Attach sandbox handle to CommandNode instances."""
if node.node_type is not NodeType.COMMAND:
return
try:
# FIXME: type: ignore[attr-defined]
node.sandbox = self.sandbox # type: ignore[attr-defined]
except Exception:
logger.exception("Failed to attach sandbox to node")
def on_node_run_end(self, node: Node[Any], error: Exception | None) -> None:
_ = error
if node.node_type is not NodeType.COMMAND:
return
with contextlib.suppress(Exception):
# FIXME: type: ignore[attr-defined]
node.sandbox = None # type: ignore[attr-defined]
def on_graph_end(self, error: Exception | None) -> None:
"""
Release the sandbox when workflow execution ends.
This method is idempotent and will not raise exceptions on cleanup failure.
Args:
error: The exception that caused execution to fail, or None if successful
"""
if self._sandbox is None:
logger.debug("No sandbox to release")
return
sandbox_id = self._sandbox.metadata.id
logger.info("Releasing sandbox, sandbox_id=%s", sandbox_id)
try:
self._sandbox.release_environment()
logger.info("Sandbox released, sandbox_id=%s", sandbox_id)
except Exception:
# Log but don't raise - cleanup failures should not break workflow completion
logger.exception("Failed to release sandbox, sandbox_id=%s", sandbox_id)
finally:
self._sandbox = None