From ec29c249160e7aa2899a845fb2c861b16d37cb9e Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Fri, 2 Jan 2026 15:03:04 +0800 Subject: [PATCH] feat: enhance QueueTransportReadCloser to handle reading with available data and improve EOF handling --- .../channel/queue_transport.py | 12 +++--- .../channel/test_transports.py | 39 +++++++++++++++++++ 2 files changed, 45 insertions(+), 6 deletions(-) diff --git a/api/core/virtual_environment/channel/queue_transport.py b/api/core/virtual_environment/channel/queue_transport.py index 7fd9cbcc35..e3b09bd92e 100644 --- a/api/core/virtual_environment/channel/queue_transport.py +++ b/api/core/virtual_environment/channel/queue_transport.py @@ -73,11 +73,15 @@ class QueueTransportReadCloser(TransportReadCloser): raise TransportEOFError("Transport is closed") to_return = self._drain_buffer(n) - while len(to_return) < n and not self._closed: + + while len(to_return) < n and not self._closed and self.q.qsize() > 0: chunk = self.q.get() if chunk is None: self._closed = True - raise TransportEOFError("Transport is closed") + if len(to_return) == 0: + raise TransportEOFError("Transport is closed") + else: + break self._read_buffer.extend(chunk) @@ -88,10 +92,6 @@ class QueueTransportReadCloser(TransportReadCloser): # No more data needed, break break - if self.q.qsize() == 0: - # If no more data is available, break to return what we have - break - return to_return def _drain_buffer(self, n: int) -> bytes: diff --git a/api/tests/unit_tests/core/virtual_environment/channel/test_transports.py b/api/tests/unit_tests/core/virtual_environment/channel/test_transports.py index e2a84e6000..c4b6a4d83f 100644 --- a/api/tests/unit_tests/core/virtual_environment/channel/test_transports.py +++ b/api/tests/unit_tests/core/virtual_environment/channel/test_transports.py @@ -1,5 +1,6 @@ import os import socket +import threading import pytest @@ -27,6 +28,44 @@ def test_queue_transport_reads_across_chunks() -> None: assert transport.read(2) == b"ld" +def test_queue_transport_reads_all_available_when_under_n() -> None: + transport = QueueTransportReadCloser() + writer = transport.get_write_handler() + writer.write(b"hi") + writer.write(b"there") + + assert transport.read(32) == b"hithere" + + +def test_queue_transport_returns_buffer_without_blocking() -> None: + transport = QueueTransportReadCloser() + writer = transport.get_write_handler() + writer.write(b"abcdef") + + assert transport.read(4) == b"abcd" + + result: list[bytes] = [] + thread = threading.Thread(target=lambda: result.append(transport.read(4))) + thread.start() + thread.join(timeout=1) + if thread.is_alive(): + transport.close() + thread.join(timeout=1) + assert not thread.is_alive() + assert result == [b"ef"] + + +def test_queue_transport_returns_data_before_eof() -> None: + transport = QueueTransportReadCloser() + writer = transport.get_write_handler() + writer.write(b"end") + transport.close() + + assert transport.read(10) == b"end" + with pytest.raises(TransportEOFError): + transport.read(1) + + def test_queue_transport_close_then_read_raises() -> None: transport = QueueTransportReadCloser() transport.close()