diff --git a/api/core/model_runtime/model_providers/__base/large_language_model.py b/api/core/model_runtime/model_providers/__base/large_language_model.py index 8638ee7d64..bbbdec61d1 100644 --- a/api/core/model_runtime/model_providers/__base/large_language_model.py +++ b/api/core/model_runtime/model_providers/__base/large_language_model.py @@ -92,6 +92,10 @@ def _build_llm_result_from_first_chunk( Build a single `LLMResult` from the first returned chunk. This is used for `stream=False` because the plugin side may still implement the response via a chunked stream. + + Note: + This function always drains the `chunks` iterator after reading the first chunk to ensure any underlying + streaming resources are released (e.g., HTTP connections owned by the plugin runtime). """ content = "" content_list: list[PromptMessageContentUnionTypes] = [] @@ -99,18 +103,25 @@ def _build_llm_result_from_first_chunk( system_fingerprint: str | None = None tools_calls: list[AssistantPromptMessage.ToolCall] = [] - first_chunk = next(chunks, None) - if first_chunk is not None: - if isinstance(first_chunk.delta.message.content, str): - content += first_chunk.delta.message.content - elif isinstance(first_chunk.delta.message.content, list): - content_list.extend(first_chunk.delta.message.content) + try: + first_chunk = next(chunks, None) + if first_chunk is not None: + if isinstance(first_chunk.delta.message.content, str): + content += first_chunk.delta.message.content + elif isinstance(first_chunk.delta.message.content, list): + content_list.extend(first_chunk.delta.message.content) - if first_chunk.delta.message.tool_calls: - _increase_tool_call(first_chunk.delta.message.tool_calls, tools_calls) + if first_chunk.delta.message.tool_calls: + _increase_tool_call(first_chunk.delta.message.tool_calls, tools_calls) - usage = first_chunk.delta.usage or LLMUsage.empty_usage() - system_fingerprint = first_chunk.system_fingerprint + usage = first_chunk.delta.usage or LLMUsage.empty_usage() + system_fingerprint = first_chunk.system_fingerprint + finally: + try: + for _ in chunks: + pass + except Exception: + logger.debug("Failed to drain non-stream plugin chunk iterator.", exc_info=True) return LLMResult( model=model, diff --git a/api/tests/unit_tests/core/model_runtime/__base/test_large_language_model_non_stream_parsing.py b/api/tests/unit_tests/core/model_runtime/__base/test_large_language_model_non_stream_parsing.py index 91352b2a5f..cfdeef6a8d 100644 --- a/api/tests/unit_tests/core/model_runtime/__base/test_large_language_model_non_stream_parsing.py +++ b/api/tests/unit_tests/core/model_runtime/__base/test_large_language_model_non_stream_parsing.py @@ -101,3 +101,26 @@ def test__normalize_non_stream_plugin_result__empty_iterator_defaults(): assert result.message.tool_calls == [] assert result.usage == LLMUsage.empty_usage() assert result.system_fingerprint is None + + +def test__normalize_non_stream_plugin_result__closes_chunk_iterator(): + prompt_messages = [UserPromptMessage(content="hi")] + + chunk = _make_chunk(content="hello", usage=LLMUsage.empty_usage()) + closed: list[bool] = [] + + def _chunk_iter(): + try: + yield chunk + yield _make_chunk(content="ignored", usage=LLMUsage.empty_usage()) + finally: + closed.append(True) + + result = _normalize_non_stream_plugin_result( + model="test-model", + prompt_messages=prompt_messages, + result=_chunk_iter(), + ) + + assert result.message.content == "hello" + assert closed == [True]