mirror of
https://github.com/langgenius/dify.git
synced 2026-01-24 04:32:18 +08:00
feat: enhance QueueTransportReadCloser to handle reading with available data and improve EOF handling
This commit is contained in:
parent
3842eade67
commit
ec29c24916
@ -73,11 +73,15 @@ class QueueTransportReadCloser(TransportReadCloser):
|
||||
raise TransportEOFError("Transport is closed")
|
||||
|
||||
to_return = self._drain_buffer(n)
|
||||
while len(to_return) < n and not self._closed:
|
||||
|
||||
while len(to_return) < n and not self._closed and self.q.qsize() > 0:
|
||||
chunk = self.q.get()
|
||||
if chunk is None:
|
||||
self._closed = True
|
||||
raise TransportEOFError("Transport is closed")
|
||||
if len(to_return) == 0:
|
||||
raise TransportEOFError("Transport is closed")
|
||||
else:
|
||||
break
|
||||
|
||||
self._read_buffer.extend(chunk)
|
||||
|
||||
@ -88,10 +92,6 @@ class QueueTransportReadCloser(TransportReadCloser):
|
||||
# No more data needed, break
|
||||
break
|
||||
|
||||
if self.q.qsize() == 0:
|
||||
# If no more data is available, break to return what we have
|
||||
break
|
||||
|
||||
return to_return
|
||||
|
||||
def _drain_buffer(self, n: int) -> bytes:
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import os
|
||||
import socket
|
||||
import threading
|
||||
|
||||
import pytest
|
||||
|
||||
@ -27,6 +28,44 @@ def test_queue_transport_reads_across_chunks() -> None:
|
||||
assert transport.read(2) == b"ld"
|
||||
|
||||
|
||||
def test_queue_transport_reads_all_available_when_under_n() -> None:
|
||||
transport = QueueTransportReadCloser()
|
||||
writer = transport.get_write_handler()
|
||||
writer.write(b"hi")
|
||||
writer.write(b"there")
|
||||
|
||||
assert transport.read(32) == b"hithere"
|
||||
|
||||
|
||||
def test_queue_transport_returns_buffer_without_blocking() -> None:
|
||||
transport = QueueTransportReadCloser()
|
||||
writer = transport.get_write_handler()
|
||||
writer.write(b"abcdef")
|
||||
|
||||
assert transport.read(4) == b"abcd"
|
||||
|
||||
result: list[bytes] = []
|
||||
thread = threading.Thread(target=lambda: result.append(transport.read(4)))
|
||||
thread.start()
|
||||
thread.join(timeout=1)
|
||||
if thread.is_alive():
|
||||
transport.close()
|
||||
thread.join(timeout=1)
|
||||
assert not thread.is_alive()
|
||||
assert result == [b"ef"]
|
||||
|
||||
|
||||
def test_queue_transport_returns_data_before_eof() -> None:
|
||||
transport = QueueTransportReadCloser()
|
||||
writer = transport.get_write_handler()
|
||||
writer.write(b"end")
|
||||
transport.close()
|
||||
|
||||
assert transport.read(10) == b"end"
|
||||
with pytest.raises(TransportEOFError):
|
||||
transport.read(1)
|
||||
|
||||
|
||||
def test_queue_transport_close_then_read_raises() -> None:
|
||||
transport = QueueTransportReadCloser()
|
||||
transport.close()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user