feat: sandbox layer for workflow execution

This commit is contained in:
Harry 2026-01-06 15:47:20 +08:00
parent 36b7075cf4
commit caabca3f02
4 changed files with 598 additions and 0 deletions

View File

@ -0,0 +1,133 @@
"""
Sandbox Layer for managing VirtualEnvironment lifecycle during workflow execution.
"""
import logging
from collections.abc import Mapping
from typing import Any
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
from core.virtual_environment.factory import SandboxFactory, SandboxType
from core.workflow.graph_engine.layers.base import GraphEngineLayer
from core.workflow.graph_events.base import GraphEngineEvent
logger = logging.getLogger(__name__)
class SandboxInitializationError(Exception):
"""Raised when sandbox initialization fails."""
pass
class SandboxLayer(GraphEngineLayer):
"""
Manages VirtualEnvironment (sandbox) lifecycle during workflow execution.
Responsibilities:
- on_graph_start: Initialize the sandbox environment
- on_graph_end: Release the sandbox environment (cleanup)
Example:
layer = SandboxLayer(
sandbox_type=SandboxType.DOCKER,
options={"docker_image": "python:3.11-slim"},
)
graph_engine.layer(layer)
# During workflow execution, access sandbox via:
# layer.sandbox.execute_command(...)
"""
def __init__(
self,
# TODO: read from db table
sandbox_type: SandboxType = SandboxType.DOCKER,
options: Mapping[str, Any] | None = None,
environments: Mapping[str, str] | None = None,
) -> None:
"""
Initialize the SandboxLayer.
Args:
sandbox_type: Type of sandbox to create (default: DOCKER)
options: Sandbox-specific configuration options
environments: Environment variables to set in the sandbox
"""
super().__init__()
self._sandbox_type = sandbox_type
self._options: Mapping[str, Any] = options or {}
self._environments: Mapping[str, str] = environments or {}
self._sandbox: VirtualEnvironment | None = None
@property
def sandbox(self) -> VirtualEnvironment:
"""
Get the current sandbox instance.
Returns:
The initialized VirtualEnvironment instance
Raises:
RuntimeError: If sandbox has not been initialized
"""
if self._sandbox is None:
raise RuntimeError("Sandbox not initialized. Ensure on_graph_start() has been called.")
return self._sandbox
def on_graph_start(self) -> None:
"""
Initialize the sandbox when workflow execution starts.
Raises:
SandboxInitializationError: If sandbox cannot be created
"""
logger.info("Initializing sandbox, sandbox_type=%s", self._sandbox_type)
try:
self._sandbox = SandboxFactory.create(
sandbox_type=self._sandbox_type,
options=self._options,
environments=self._environments,
)
logger.info(
"Sandbox initialized, sandbox_id=%s, sandbox_arch=%s",
self._sandbox.metadata.id,
self._sandbox.metadata.arch,
)
except Exception as e:
logger.exception("Failed to initialize sandbox")
raise SandboxInitializationError(f"Failed to initialize {self._sandbox_type} sandbox: {e}") from e
def on_event(self, event: GraphEngineEvent) -> None:
"""
Handle graph engine events.
Currently a no-op, but can be extended for sandbox monitoring/health checks.
"""
pass
def on_graph_end(self, error: Exception | None) -> None:
"""
Release the sandbox when workflow execution ends.
This method is idempotent and will not raise exceptions on cleanup failure.
Args:
error: The exception that caused execution to fail, or None if successful
"""
if self._sandbox is None:
logger.debug("No sandbox to release")
return
sandbox_id = self._sandbox.metadata.id
logger.info("Releasing sandbox, sandbox_id=%s", sandbox_id)
try:
self._sandbox.release_environment()
logger.info("Sandbox released, sandbox_id=%s", sandbox_id)
except Exception:
# Log but don't raise - cleanup failures should not break workflow completion
logger.exception("Failed to release sandbox, sandbox_id=%s", sandbox_id)
finally:
self._sandbox = None

View File

@ -0,0 +1,78 @@
"""
Sandbox factory for creating VirtualEnvironment instances.
Example:
sandbox = SandboxFactory.create(
SandboxType.DOCKER,
options={"docker_image": "python:3.11-slim"},
environments={"PATH": "/usr/local/bin"},
)
"""
from collections.abc import Mapping
from enum import StrEnum
from typing import Any
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
class SandboxType(StrEnum):
"""Supported sandbox types."""
DOCKER = "docker"
E2B = "e2b"
LOCAL = "local"
class SandboxFactory:
"""
Factory for creating VirtualEnvironment (sandbox) instances.
Uses lazy imports to avoid loading unused providers.
"""
@classmethod
def create(
cls,
sandbox_type: SandboxType,
options: Mapping[str, Any] | None = None,
environments: Mapping[str, str] | None = None,
) -> VirtualEnvironment:
"""
Create a VirtualEnvironment instance based on the specified type.
Args:
sandbox_type: Type of sandbox to create
options: Sandbox-specific configuration options
environments: Environment variables to set in the sandbox
Returns:
Configured VirtualEnvironment instance
Raises:
ValueError: If sandbox type is not supported
"""
options = options or {}
environments = environments or {}
sandbox_class = cls._get_sandbox_class(sandbox_type)
return sandbox_class(options=options, environments=environments)
@classmethod
def _get_sandbox_class(cls, sandbox_type: SandboxType) -> type[VirtualEnvironment]:
"""Get the sandbox class for the specified type (lazy import)."""
match sandbox_type:
case SandboxType.DOCKER:
from core.virtual_environment.providers.docker_daemon_sandbox import DockerDaemonEnvironment
return DockerDaemonEnvironment
case SandboxType.E2B:
from core.virtual_environment.providers.e2b_sandbox import E2BEnvironment
return E2BEnvironment
case SandboxType.LOCAL:
from core.virtual_environment.providers.local_without_isolation import LocalVirtualEnvironment
return LocalVirtualEnvironment
case _:
raise ValueError(f"Unsupported sandbox type: {sandbox_type}")

View File

@ -0,0 +1,243 @@
"""
Unit tests for the SandboxLayer.
This module tests the SandboxLayer lifecycle management including initialization,
event handling, and cleanup of VirtualEnvironment instances.
"""
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from core.app.layers.sandbox_layer import SandboxInitializationError, SandboxLayer
from core.virtual_environment.__base.entities import Arch
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
from core.virtual_environment.factory import SandboxFactory, SandboxType
from core.workflow.graph_engine.layers.base import GraphEngineLayerNotInitializedError
from core.workflow.graph_events.graph import (
GraphRunFailedEvent,
GraphRunStartedEvent,
GraphRunSucceededEvent,
)
class MockMetadata:
"""Mock metadata for testing."""
def __init__(self, sandbox_id: str = "test-sandbox-id", arch: Arch = Arch.AMD64):
self.id = sandbox_id
self.arch = arch
class MockVirtualEnvironment:
"""Mock VirtualEnvironment for testing."""
def __init__(self, sandbox_id: str = "test-sandbox-id"):
self.metadata = MockMetadata(sandbox_id=sandbox_id)
self._released = False
def release_environment(self) -> None:
self._released = True
class TestSandboxLayer:
"""Unit tests for SandboxLayer."""
def test_init_with_default_parameters(self):
"""Test SandboxLayer initialization with default parameters."""
layer = SandboxLayer()
assert layer._sandbox_type == SandboxType.DOCKER
assert layer._options == {}
assert layer._environments == {}
assert layer._sandbox is None
def test_init_with_custom_parameters(self):
"""Test SandboxLayer initialization with custom parameters."""
layer = SandboxLayer(
sandbox_type=SandboxType.LOCAL,
options={"base_working_path": "/tmp/sandbox"},
environments={"PYTHONUNBUFFERED": "1"},
)
assert layer._sandbox_type == SandboxType.LOCAL
assert layer._options == {"base_working_path": "/tmp/sandbox"}
assert layer._environments == {"PYTHONUNBUFFERED": "1"}
def test_sandbox_property_raises_when_not_initialized(self):
"""Test that accessing sandbox property raises error before initialization."""
layer = SandboxLayer()
with pytest.raises(RuntimeError) as exc_info:
_ = layer.sandbox
assert "Sandbox not initialized" in str(exc_info.value)
def test_sandbox_property_returns_sandbox_after_initialization(self):
"""Test that sandbox property returns the sandbox after on_graph_start."""
layer = SandboxLayer()
mock_sandbox = MockVirtualEnvironment()
with patch.object(SandboxFactory, "create", return_value=mock_sandbox):
layer.on_graph_start()
assert layer.sandbox is mock_sandbox
def test_on_graph_start_creates_sandbox(self):
"""Test that on_graph_start creates a sandbox via factory."""
layer = SandboxLayer(
sandbox_type=SandboxType.DOCKER,
options={"docker_image": "python:3.11"},
environments={"PATH": "/usr/bin"},
)
mock_sandbox = MockVirtualEnvironment()
with patch.object(SandboxFactory, "create", return_value=mock_sandbox) as mock_create:
layer.on_graph_start()
mock_create.assert_called_once_with(
sandbox_type=SandboxType.DOCKER,
options={"docker_image": "python:3.11"},
environments={"PATH": "/usr/bin"},
)
def test_on_graph_start_raises_sandbox_initialization_error_on_failure(self):
"""Test that on_graph_start raises SandboxInitializationError on factory failure."""
layer = SandboxLayer(sandbox_type=SandboxType.DOCKER)
with patch.object(SandboxFactory, "create", side_effect=Exception("Docker not available")):
with pytest.raises(SandboxInitializationError) as exc_info:
layer.on_graph_start()
assert "Failed to initialize docker sandbox" in str(exc_info.value)
assert "Docker not available" in str(exc_info.value)
def test_on_event_is_noop(self):
"""Test that on_event does nothing (no-op)."""
layer = SandboxLayer()
# These should not raise any exceptions
layer.on_event(GraphRunStartedEvent())
layer.on_event(GraphRunSucceededEvent(outputs={}))
layer.on_event(GraphRunFailedEvent(error="test error", exceptions_count=1))
def test_on_graph_end_releases_sandbox(self):
"""Test that on_graph_end releases the sandbox."""
layer = SandboxLayer()
mock_sandbox = MagicMock(spec=VirtualEnvironment)
mock_sandbox.metadata = MockMetadata()
with patch.object(SandboxFactory, "create", return_value=mock_sandbox):
layer.on_graph_start()
layer.on_graph_end(error=None)
mock_sandbox.release_environment.assert_called_once()
assert layer._sandbox is None
def test_on_graph_end_releases_sandbox_even_on_error(self):
"""Test that on_graph_end releases sandbox even when workflow had an error."""
layer = SandboxLayer()
mock_sandbox = MagicMock(spec=VirtualEnvironment)
mock_sandbox.metadata = MockMetadata()
with patch.object(SandboxFactory, "create", return_value=mock_sandbox):
layer.on_graph_start()
layer.on_graph_end(error=Exception("Workflow failed"))
mock_sandbox.release_environment.assert_called_once()
assert layer._sandbox is None
def test_on_graph_end_handles_release_failure_gracefully(self):
"""Test that on_graph_end handles release failures without raising."""
layer = SandboxLayer()
mock_sandbox = MagicMock(spec=VirtualEnvironment)
mock_sandbox.metadata = MockMetadata()
mock_sandbox.release_environment.side_effect = Exception("Container already removed")
with patch.object(SandboxFactory, "create", return_value=mock_sandbox):
layer.on_graph_start()
# Should not raise exception
layer.on_graph_end(error=None)
mock_sandbox.release_environment.assert_called_once()
assert layer._sandbox is None
def test_on_graph_end_noop_when_sandbox_not_initialized(self):
"""Test that on_graph_end is a no-op when sandbox was never initialized."""
layer = SandboxLayer()
# Should not raise exception
layer.on_graph_end(error=None)
assert layer._sandbox is None
def test_on_graph_end_is_idempotent(self):
"""Test that calling on_graph_end multiple times is safe."""
layer = SandboxLayer()
mock_sandbox = MagicMock(spec=VirtualEnvironment)
mock_sandbox.metadata = MockMetadata()
with patch.object(SandboxFactory, "create", return_value=mock_sandbox):
layer.on_graph_start()
layer.on_graph_end(error=None)
layer.on_graph_end(error=None) # Second call should be no-op
mock_sandbox.release_environment.assert_called_once()
def test_layer_inherits_from_graph_engine_layer(self):
"""Test that SandboxLayer properly inherits from GraphEngineLayer."""
layer = SandboxLayer()
# Should have the graph_runtime_state property from base class
with pytest.raises(GraphEngineLayerNotInitializedError):
_ = layer.graph_runtime_state
# Should have command_channel from base class
assert layer.command_channel is None
class TestSandboxLayerIntegration:
"""Integration tests for SandboxLayer with real LocalVirtualEnvironment."""
def test_full_lifecycle_with_local_sandbox(self, tmp_path: Path):
"""Test complete lifecycle: init -> start -> end with local sandbox."""
layer = SandboxLayer(
sandbox_type=SandboxType.LOCAL,
options={"base_working_path": str(tmp_path)},
)
# Start
layer.on_graph_start()
# Verify sandbox is created
assert layer._sandbox is not None
sandbox_id = layer.sandbox.metadata.id
assert sandbox_id is not None
# End
layer.on_graph_end(error=None)
# Verify sandbox is released
assert layer._sandbox is None
def test_lifecycle_with_workflow_error(self, tmp_path: Path):
"""Test lifecycle when workflow encounters an error."""
layer = SandboxLayer(
sandbox_type=SandboxType.LOCAL,
options={"base_working_path": str(tmp_path)},
)
layer.on_graph_start()
assert layer.sandbox.metadata.id is not None
# Simulate workflow error
layer.on_graph_end(error=Exception("Workflow execution failed"))
# Sandbox should still be cleaned up
# pyright: ignore[reportPrivateUsage]
assert layer._sandbox is None # pyright: ignore[reportPrivateUsage]

View File

@ -0,0 +1,144 @@
"""
Unit tests for the SandboxFactory.
This module tests the factory pattern implementation for creating VirtualEnvironment instances
based on sandbox type, including error handling for unsupported types.
"""
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from core.virtual_environment.__base.virtual_environment import VirtualEnvironment
from core.virtual_environment.factory import SandboxFactory, SandboxType
class TestSandboxType:
"""Test cases for SandboxType enum."""
def test_sandbox_type_values(self):
"""Test that SandboxType enum has expected values."""
assert SandboxType.DOCKER == "docker"
assert SandboxType.E2B == "e2b"
assert SandboxType.LOCAL == "local"
def test_sandbox_type_is_string_enum(self):
"""Test that SandboxType values are strings."""
assert isinstance(SandboxType.DOCKER.value, str)
assert isinstance(SandboxType.E2B.value, str)
assert isinstance(SandboxType.LOCAL.value, str)
class TestSandboxFactory:
"""Test cases for SandboxFactory."""
def test_create_docker_sandbox_success(self):
"""Test successful Docker sandbox creation."""
mock_sandbox_instance = MagicMock(spec=VirtualEnvironment)
mock_sandbox_class = MagicMock(return_value=mock_sandbox_instance)
with patch.object(SandboxFactory, "_get_sandbox_class", return_value=mock_sandbox_class):
result = SandboxFactory.create(
sandbox_type=SandboxType.DOCKER,
options={"docker_image": "python:3.11-slim"},
environments={"PYTHONUNBUFFERED": "1"},
)
mock_sandbox_class.assert_called_once_with(
options={"docker_image": "python:3.11-slim"},
environments={"PYTHONUNBUFFERED": "1"},
)
assert result is mock_sandbox_instance
def test_create_with_none_options_uses_empty_dict(self):
"""Test that None options are converted to empty dict."""
mock_sandbox_instance = MagicMock(spec=VirtualEnvironment)
mock_sandbox_class = MagicMock(return_value=mock_sandbox_instance)
with patch.object(SandboxFactory, "_get_sandbox_class", return_value=mock_sandbox_class):
SandboxFactory.create(sandbox_type=SandboxType.DOCKER, options=None, environments=None)
mock_sandbox_class.assert_called_once_with(options={}, environments={})
def test_create_with_default_parameters(self):
"""Test sandbox creation with default parameters."""
mock_sandbox_instance = MagicMock(spec=VirtualEnvironment)
mock_sandbox_class = MagicMock(return_value=mock_sandbox_instance)
with patch.object(SandboxFactory, "_get_sandbox_class", return_value=mock_sandbox_class):
result = SandboxFactory.create(sandbox_type=SandboxType.DOCKER)
mock_sandbox_class.assert_called_once_with(options={}, environments={})
assert result is mock_sandbox_instance
def test_get_sandbox_class_docker_returns_correct_class(self):
"""Test that DOCKER type returns DockerDaemonEnvironment class."""
# Test by creating with mock to verify the class lookup works
mock_instance = MagicMock(spec=VirtualEnvironment)
with patch(
"core.virtual_environment.providers.docker_daemon_sandbox.DockerDaemonEnvironment",
return_value=mock_instance,
) as mock_docker_class:
SandboxFactory.create(sandbox_type=SandboxType.DOCKER)
mock_docker_class.assert_called_once()
def test_get_sandbox_class_local_returns_correct_class(self):
"""Test that LOCAL type returns LocalVirtualEnvironment class."""
mock_instance = MagicMock(spec=VirtualEnvironment)
with patch(
"core.virtual_environment.providers.local_without_isolation.LocalVirtualEnvironment",
return_value=mock_instance,
) as mock_local_class:
SandboxFactory.create(sandbox_type=SandboxType.LOCAL)
mock_local_class.assert_called_once()
def test_get_sandbox_class_e2b_returns_correct_class(self):
"""Test that E2B type returns E2BEnvironment class."""
mock_instance = MagicMock(spec=VirtualEnvironment)
with patch(
"core.virtual_environment.providers.e2b_sandbox.E2BEnvironment",
return_value=mock_instance,
) as mock_e2b_class:
SandboxFactory.create(sandbox_type=SandboxType.E2B)
mock_e2b_class.assert_called_once()
def test_create_with_unsupported_type_raises_value_error(self):
"""Test that unsupported sandbox type raises ValueError."""
with pytest.raises(ValueError) as exc_info:
SandboxFactory.create(sandbox_type="unsupported_type") # type: ignore[arg-type]
assert "Unsupported sandbox type: unsupported_type" in str(exc_info.value)
def test_create_propagates_instantiation_error(self):
"""Test that sandbox instantiation errors are propagated."""
mock_sandbox_class = MagicMock()
mock_sandbox_class.side_effect = Exception("Docker daemon not available")
with patch.object(SandboxFactory, "_get_sandbox_class", return_value=mock_sandbox_class):
with pytest.raises(Exception) as exc_info:
SandboxFactory.create(sandbox_type=SandboxType.DOCKER)
assert "Docker daemon not available" in str(exc_info.value)
class TestSandboxFactoryIntegration:
"""Integration tests for SandboxFactory with real providers (using LOCAL type)."""
def test_create_local_sandbox_integration(self, tmp_path: Path):
"""Test creating a real local sandbox."""
sandbox = SandboxFactory.create(
sandbox_type=SandboxType.LOCAL,
options={"base_working_path": str(tmp_path)},
environments={},
)
try:
assert sandbox is not None
assert sandbox.metadata.id is not None
assert sandbox.metadata.arch is not None
finally:
sandbox.release_environment()