From cf7e2d5d754668ebd0bae16d390e911dd1996e56 Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Thu, 1 Jan 2026 18:57:03 +0800 Subject: [PATCH] feat: add unit tests for transport classes including queue, pipe, and socket transports --- .../channel/test_transports.py | 120 ++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 api/tests/unit_tests/core/virtual_environment/channel/test_transports.py diff --git a/api/tests/unit_tests/core/virtual_environment/channel/test_transports.py b/api/tests/unit_tests/core/virtual_environment/channel/test_transports.py new file mode 100644 index 0000000000..e2a84e6000 --- /dev/null +++ b/api/tests/unit_tests/core/virtual_environment/channel/test_transports.py @@ -0,0 +1,120 @@ +import os +import socket + +import pytest + +from core.virtual_environment.channel.exec import TransportEOFError +from core.virtual_environment.channel.pipe_transport import PipeReadCloser, PipeTransport, PipeWriteCloser +from core.virtual_environment.channel.queue_transport import QueueTransportReadCloser +from core.virtual_environment.channel.socket_transport import SocketReadCloser, SocketTransport, SocketWriteCloser + + +def _close_socket(sock: socket.socket) -> None: + try: + sock.close() + except OSError: + pass + + +def test_queue_transport_reads_across_chunks() -> None: + transport = QueueTransportReadCloser() + writer = transport.get_write_handler() + writer.write(b"hello") + writer.write(b"world") + + data = transport.read(8) + assert data == b"hellowor" + assert transport.read(2) == b"ld" + + +def test_queue_transport_close_then_read_raises() -> None: + transport = QueueTransportReadCloser() + transport.close() + + with pytest.raises(TransportEOFError): + transport.read(1) + + +def test_queue_transport_close_twice_raises() -> None: + transport = QueueTransportReadCloser() + transport.close() + + with pytest.raises(TransportEOFError): + transport.close() + + +def test_pipe_transport_roundtrip() -> None: + r_fd, w_fd = os.pipe() + transport = PipeTransport(r_fd, w_fd) + try: + transport.write(b"ping") + assert transport.read(4) == b"ping" + finally: + transport.close() + + +def test_pipe_read_closer_eof_raises() -> None: + r_fd, w_fd = os.pipe() + os.close(w_fd) + reader = PipeReadCloser(r_fd) + try: + with pytest.raises(TransportEOFError): + reader.read(1) + finally: + reader.close() + + +def test_pipe_write_closer_eof_raises() -> None: + r_fd, w_fd = os.pipe() + os.close(r_fd) + writer = PipeWriteCloser(w_fd) + try: + with pytest.raises(TransportEOFError): + writer.write(b"x") + finally: + writer.close() + + +def test_socket_transport_roundtrip() -> None: + sock_a, sock_b = socket.socketpair() + sock_a_io = sock_a.makefile("rwb", buffering=0) + sock_b_io = sock_b.makefile("rwb", buffering=0) + transport_a = SocketTransport(sock_a_io) + transport_b = SocketTransport(sock_b_io) + try: + transport_a.write(b"x") + assert transport_b.read(1) == b"x" + finally: + transport_a.close() + transport_b.close() + _close_socket(sock_a) + _close_socket(sock_b) + + +def test_socket_read_closer_eof_raises() -> None: + sock_a, sock_b = socket.socketpair() + sock_a_io = sock_a.makefile("rb", buffering=0) + reader = SocketReadCloser(sock_a_io) + try: + sock_b.close() + with pytest.raises(TransportEOFError): + reader.read(1) + finally: + reader.close() + _close_socket(sock_a) + + +def test_socket_write_closer_writes() -> None: + sock_a, sock_b = socket.socketpair() + sock_a_io = sock_a.makefile("wb", buffering=0) + sock_b_io = sock_b.makefile("rb", buffering=0) + writer = SocketWriteCloser(sock_a_io) + reader = SocketReadCloser(sock_b_io) + try: + writer.write(b"y") + assert reader.read(1) == b"y" + finally: + writer.close() + reader.close() + _close_socket(sock_a) + _close_socket(sock_b)