feat: add unit tests for transport classes including queue, pipe, and socket transports

This commit is contained in:
Yeuoly 2026-01-01 18:57:03 +08:00
parent 2673fe05a5
commit cf7e2d5d75

View File

@ -0,0 +1,120 @@
import os
import socket
import pytest
from core.virtual_environment.channel.exec import TransportEOFError
from core.virtual_environment.channel.pipe_transport import PipeReadCloser, PipeTransport, PipeWriteCloser
from core.virtual_environment.channel.queue_transport import QueueTransportReadCloser
from core.virtual_environment.channel.socket_transport import SocketReadCloser, SocketTransport, SocketWriteCloser
def _close_socket(sock: socket.socket) -> None:
try:
sock.close()
except OSError:
pass
def test_queue_transport_reads_across_chunks() -> None:
transport = QueueTransportReadCloser()
writer = transport.get_write_handler()
writer.write(b"hello")
writer.write(b"world")
data = transport.read(8)
assert data == b"hellowor"
assert transport.read(2) == b"ld"
def test_queue_transport_close_then_read_raises() -> None:
transport = QueueTransportReadCloser()
transport.close()
with pytest.raises(TransportEOFError):
transport.read(1)
def test_queue_transport_close_twice_raises() -> None:
transport = QueueTransportReadCloser()
transport.close()
with pytest.raises(TransportEOFError):
transport.close()
def test_pipe_transport_roundtrip() -> None:
r_fd, w_fd = os.pipe()
transport = PipeTransport(r_fd, w_fd)
try:
transport.write(b"ping")
assert transport.read(4) == b"ping"
finally:
transport.close()
def test_pipe_read_closer_eof_raises() -> None:
r_fd, w_fd = os.pipe()
os.close(w_fd)
reader = PipeReadCloser(r_fd)
try:
with pytest.raises(TransportEOFError):
reader.read(1)
finally:
reader.close()
def test_pipe_write_closer_eof_raises() -> None:
r_fd, w_fd = os.pipe()
os.close(r_fd)
writer = PipeWriteCloser(w_fd)
try:
with pytest.raises(TransportEOFError):
writer.write(b"x")
finally:
writer.close()
def test_socket_transport_roundtrip() -> None:
sock_a, sock_b = socket.socketpair()
sock_a_io = sock_a.makefile("rwb", buffering=0)
sock_b_io = sock_b.makefile("rwb", buffering=0)
transport_a = SocketTransport(sock_a_io)
transport_b = SocketTransport(sock_b_io)
try:
transport_a.write(b"x")
assert transport_b.read(1) == b"x"
finally:
transport_a.close()
transport_b.close()
_close_socket(sock_a)
_close_socket(sock_b)
def test_socket_read_closer_eof_raises() -> None:
sock_a, sock_b = socket.socketpair()
sock_a_io = sock_a.makefile("rb", buffering=0)
reader = SocketReadCloser(sock_a_io)
try:
sock_b.close()
with pytest.raises(TransportEOFError):
reader.read(1)
finally:
reader.close()
_close_socket(sock_a)
def test_socket_write_closer_writes() -> None:
sock_a, sock_b = socket.socketpair()
sock_a_io = sock_a.makefile("wb", buffering=0)
sock_b_io = sock_b.makefile("rb", buffering=0)
writer = SocketWriteCloser(sock_a_io)
reader = SocketReadCloser(sock_b_io)
try:
writer.write(b"y")
assert reader.read(1) == b"y"
finally:
writer.close()
reader.close()
_close_socket(sock_a)
_close_socket(sock_b)