feat: add tests for QueueTransportReadCloser to handle blocking reads and first chunk returns

This commit is contained in:
Yeuoly 2026-01-04 17:57:49 +08:00
parent a911b268aa
commit 81547c5981

View File

@ -37,6 +37,67 @@ def test_queue_transport_reads_all_available_when_under_n() -> None:
assert transport.read(32) == b"hithere"
def test_queue_transport_blocks_until_first_chunk() -> None:
transport = QueueTransportReadCloser()
writer = transport.get_write_handler()
started = threading.Event()
done = threading.Event()
result: list[bytes] = []
errors: list[BaseException] = []
def reader() -> None:
started.set()
try:
result.append(transport.read(4))
except BaseException as exc:
errors.append(exc)
finally:
done.set()
thread = threading.Thread(target=reader)
thread.start()
assert started.wait(timeout=1)
assert not done.wait(timeout=0.1)
writer.write(b"abcd")
if not done.wait(timeout=1):
transport.close()
thread.join(timeout=1)
assert not thread.is_alive()
assert not errors
assert result == [b"abcd"]
def test_queue_transport_returns_after_first_chunk_when_queue_empty() -> None:
transport = QueueTransportReadCloser()
writer = transport.get_write_handler()
started = threading.Event()
done = threading.Event()
result: list[bytes] = []
errors: list[BaseException] = []
def reader() -> None:
started.set()
try:
result.append(transport.read(10))
except BaseException as exc:
errors.append(exc)
finally:
done.set()
thread = threading.Thread(target=reader)
thread.start()
assert started.wait(timeout=1)
assert not done.wait(timeout=0.1)
writer.write(b"abc")
if not done.wait(timeout=1):
transport.close()
thread.join(timeout=1)
assert not thread.is_alive()
assert not errors
assert result == [b"abc"]
def test_queue_transport_returns_buffer_without_blocking() -> None:
transport = QueueTransportReadCloser()
writer = transport.get_write_handler()
@ -45,13 +106,22 @@ def test_queue_transport_returns_buffer_without_blocking() -> None:
assert transport.read(4) == b"abcd"
result: list[bytes] = []
thread = threading.Thread(target=lambda: result.append(transport.read(4)))
errors: list[BaseException] = []
def reader() -> None:
try:
result.append(transport.read(4))
except BaseException as exc:
errors.append(exc)
thread = threading.Thread(target=reader)
thread.start()
thread.join(timeout=1)
if thread.is_alive():
transport.close()
thread.join(timeout=1)
assert not thread.is_alive()
assert not errors
assert result == [b"ef"]