From 81547c5981d281b2cacf2fddc1b6b2106b397e51 Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Sun, 4 Jan 2026 17:57:49 +0800 Subject: [PATCH] feat: add tests for QueueTransportReadCloser to handle blocking reads and first chunk returns --- .../channel/test_transports.py | 72 ++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) diff --git a/api/tests/unit_tests/core/virtual_environment/channel/test_transports.py b/api/tests/unit_tests/core/virtual_environment/channel/test_transports.py index c4b6a4d83f..08fd0d3a3b 100644 --- a/api/tests/unit_tests/core/virtual_environment/channel/test_transports.py +++ b/api/tests/unit_tests/core/virtual_environment/channel/test_transports.py @@ -37,6 +37,67 @@ def test_queue_transport_reads_all_available_when_under_n() -> None: assert transport.read(32) == b"hithere" +def test_queue_transport_blocks_until_first_chunk() -> None: + transport = QueueTransportReadCloser() + writer = transport.get_write_handler() + started = threading.Event() + done = threading.Event() + result: list[bytes] = [] + errors: list[BaseException] = [] + + def reader() -> None: + started.set() + try: + result.append(transport.read(4)) + except BaseException as exc: + errors.append(exc) + finally: + done.set() + + thread = threading.Thread(target=reader) + thread.start() + assert started.wait(timeout=1) + assert not done.wait(timeout=0.1) + + writer.write(b"abcd") + if not done.wait(timeout=1): + transport.close() + thread.join(timeout=1) + assert not thread.is_alive() + assert not errors + assert result == [b"abcd"] + + +def test_queue_transport_returns_after_first_chunk_when_queue_empty() -> None: + transport = QueueTransportReadCloser() + writer = transport.get_write_handler() + started = threading.Event() + done = threading.Event() + result: list[bytes] = [] + errors: list[BaseException] = [] + + def reader() -> None: + started.set() + try: + result.append(transport.read(10)) + except BaseException as exc: + errors.append(exc) + finally: + done.set() + + thread = threading.Thread(target=reader) + thread.start() + assert started.wait(timeout=1) + assert not done.wait(timeout=0.1) + writer.write(b"abc") + if not done.wait(timeout=1): + transport.close() + thread.join(timeout=1) + assert not thread.is_alive() + assert not errors + assert result == [b"abc"] + + def test_queue_transport_returns_buffer_without_blocking() -> None: transport = QueueTransportReadCloser() writer = transport.get_write_handler() @@ -45,13 +106,22 @@ def test_queue_transport_returns_buffer_without_blocking() -> None: assert transport.read(4) == b"abcd" result: list[bytes] = [] - thread = threading.Thread(target=lambda: result.append(transport.read(4))) + errors: list[BaseException] = [] + + def reader() -> None: + try: + result.append(transport.read(4)) + except BaseException as exc: + errors.append(exc) + + thread = threading.Thread(target=reader) thread.start() thread.join(timeout=1) if thread.is_alive(): transport.close() thread.join(timeout=1) assert not thread.is_alive() + assert not errors assert result == [b"ef"]