feat(daytona-environment): enhance command management with threading support and default API URL

This commit is contained in:
Yeuoly 2026-01-07 18:47:22 +08:00
parent 0cd613ae52
commit fe4c591cfd

View File

@ -44,6 +44,8 @@ class DaytonaEnvironment(VirtualEnvironment):
Daytona virtual environment provider backed by Daytona Sandboxes.
"""
_DEFAULT_DAYTONA_API_URL = "https://app.daytona.io/api"
class OptionsKey(StrEnum):
API_KEY = "api_key"
API_URL = "api_url"
@ -62,16 +64,13 @@ class DaytonaEnvironment(VirtualEnvironment):
DAYTONA = "daytona"
SANDBOX = "sandbox"
WORKDIR = "workdir"
COMMANDS = "commands"
COMMANDS_LOCK = "commands_lock"
def __init__(self, options: Mapping[str, Any], environments: Mapping[str, str] | None = None) -> None:
self._commands: dict[str, _CommandRecord] = {}
self._commands_lock = threading.Lock()
super().__init__(options, environments)
def construct_environment(self, options: Mapping[str, Any], environments: Mapping[str, str]) -> Metadata:
def _construct_environment(self, options: Mapping[str, Any], environments: Mapping[str, str]) -> Metadata:
config = DaytonaConfig(
api_key=cast(str | None, options.get(self.OptionsKey.API_KEY)),
api_url=cast(str | None, options.get(self.OptionsKey.API_URL)),
api_url=cast(str | None, options.get(self.OptionsKey.API_URL, self._DEFAULT_DAYTONA_API_URL)),
target=cast(str | None, options.get(self.OptionsKey.TARGET)),
)
daytona = Daytona(config)
@ -122,6 +121,8 @@ class DaytonaEnvironment(VirtualEnvironment):
self.StoreKey.DAYTONA: daytona,
self.StoreKey.SANDBOX: sandbox,
self.StoreKey.WORKDIR: workdir,
self.StoreKey.COMMANDS: {},
self.StoreKey.COMMANDS_LOCK: threading.Lock(),
},
)
@ -179,6 +180,7 @@ class DaytonaEnvironment(VirtualEnvironment):
self, connection_handle: ConnectionHandle, command: list[str], environments: Mapping[str, str] | None = None
) -> tuple[str, TransportWriteCloser, TransportReadCloser, TransportReadCloser]:
sandbox: Sandbox = self.metadata.store[self.StoreKey.SANDBOX]
stdout_stream = QueueTransportReadCloser()
stderr_stream = QueueTransportReadCloser()
pid = uuid4().hex
@ -188,15 +190,17 @@ class DaytonaEnvironment(VirtualEnvironment):
args=(pid, sandbox, command, environments or {}, stdout_stream, stderr_stream),
daemon=True,
)
with self._commands_lock:
self._commands[pid] = {"thread": thread, "exit_code": None}
thread.start()
return pid, NopTransportWriteCloser(), stdout_stream, stderr_stream
def get_command_status(self, connection_handle: ConnectionHandle, pid: str) -> CommandStatus:
with self._commands_lock:
record = self._commands.get(pid)
commands: dict[str, _CommandRecord] = self.metadata.store[self.StoreKey.COMMANDS]
commands_lock: threading.Lock = self.metadata.store[self.StoreKey.COMMANDS_LOCK]
with commands_lock:
record = commands.get(pid)
if not record:
return CommandStatus(status=CommandStatus.Status.COMPLETED, exit_code=None)
@ -227,6 +231,9 @@ class DaytonaEnvironment(VirtualEnvironment):
stdout_stream: QueueTransportReadCloser,
stderr_stream: QueueTransportReadCloser,
) -> None:
commands: dict[str, _CommandRecord] = self.metadata.store[self.StoreKey.COMMANDS]
commands_lock: threading.Lock = self.metadata.store[self.StoreKey.COMMANDS_LOCK]
stdout_writer = stdout_stream.get_write_handler()
stderr_writer = stderr_stream.get_write_handler()
exit_code: int | None = None
@ -245,9 +252,9 @@ class DaytonaEnvironment(VirtualEnvironment):
finally:
stdout_stream.close()
stderr_stream.close()
with self._commands_lock:
if pid in self._commands:
self._commands[pid]["exit_code"] = exit_code
with commands_lock:
if pid in commands:
commands[pid]["exit_code"] = exit_code
def _parse_mod_time(self, mod_time: str) -> int:
try: