From dea4bad4cab8d3d1fae087d2413759c383a14dc9 Mon Sep 17 00:00:00 2001 From: suyao Date: Sun, 4 Jan 2026 17:29:13 +0800 Subject: [PATCH 1/4] fix: add dispose method to prevent abort listener leak MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add dispose() method to StreamAbortController that explicitly removes the abort event listener when stream ends normally. Previously, the listener would only be removed when abort was triggered ({ once: true }), but if the stream completed normally without abort, the listener would remain attached until garbage collection. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../routes/agents/handlers/messages.ts | 34 +++++++++---------- .../utils/createStreamAbortController.ts | 13 ++++++- 2 files changed, 29 insertions(+), 18 deletions(-) diff --git a/src/main/apiServer/routes/agents/handlers/messages.ts b/src/main/apiServer/routes/agents/handlers/messages.ts index 1b547abba8..abec51ec01 100644 --- a/src/main/apiServer/routes/agents/handlers/messages.ts +++ b/src/main/apiServer/routes/agents/handlers/messages.ts @@ -1,6 +1,10 @@ import { loggerService } from '@logger' import { MESSAGE_STREAM_TIMEOUT_MS } from '@main/apiServer/config/timeouts' -import { createStreamAbortController, STREAM_TIMEOUT_REASON } from '@main/apiServer/utils/createStreamAbortController' +import { + createStreamAbortController, + STREAM_TIMEOUT_REASON, + type StreamAbortController +} from '@main/apiServer/utils/createStreamAbortController' import { agentService, sessionMessageService, sessionService } from '@main/services/agents' import type { Request, Response } from 'express' @@ -26,7 +30,7 @@ const verifyAgentAndSession = async (agentId: string, sessionId: string) => { } export const createMessage = async (req: Request, res: Response): Promise => { - let clearAbortTimeout: (() => void) | undefined + let streamController: StreamAbortController | undefined try { const { agentId, sessionId } = req.params @@ -45,14 +49,10 @@ export const createMessage = async (req: Request, res: Response): Promise res.setHeader('Access-Control-Allow-Origin', '*') res.setHeader('Access-Control-Allow-Headers', 'Cache-Control') - const { - abortController, - registerAbortHandler, - clearAbortTimeout: helperClearAbortTimeout - } = createStreamAbortController({ + streamController = createStreamAbortController({ timeoutMs: MESSAGE_STREAM_TIMEOUT_MS }) - clearAbortTimeout = helperClearAbortTimeout + const { abortController, registerAbortHandler, dispose } = streamController const { stream, completion } = await sessionMessageService.createSessionMessage( session, messageData, @@ -64,8 +64,8 @@ export const createMessage = async (req: Request, res: Response): Promise let responseEnded = false let streamFinished = false - const cleanupAbortTimeout = () => { - clearAbortTimeout?.() + const cleanup = () => { + dispose() } const finalizeResponse = () => { @@ -78,7 +78,7 @@ export const createMessage = async (req: Request, res: Response): Promise } responseEnded = true - cleanupAbortTimeout() + cleanup() try { // res.write('data: {"type":"finish"}\n\n') res.write('data: [DONE]\n\n') @@ -108,7 +108,7 @@ export const createMessage = async (req: Request, res: Response): Promise * - Mark the response as ended to prevent further writes */ registerAbortHandler((abortReason) => { - cleanupAbortTimeout() + cleanup() if (responseEnded) return @@ -189,7 +189,7 @@ export const createMessage = async (req: Request, res: Response): Promise logger.error('Error writing stream error to SSE', { error: writeError }) } responseEnded = true - cleanupAbortTimeout() + cleanup() res.end() } } @@ -221,14 +221,14 @@ export const createMessage = async (req: Request, res: Response): Promise logger.error('Error writing completion error to SSE stream', { error: writeError }) } responseEnded = true - cleanupAbortTimeout() + cleanup() res.end() }) // Clear timeout when response ends - res.on('close', cleanupAbortTimeout) - res.on('finish', cleanupAbortTimeout) + res.on('close', cleanup) + res.on('finish', cleanup) } catch (error: any) { - clearAbortTimeout?.() + streamController?.dispose() logger.error('Error in streaming message handler', { error, agentId: req.params.agentId, diff --git a/src/main/apiServer/utils/createStreamAbortController.ts b/src/main/apiServer/utils/createStreamAbortController.ts index 243ad5b96e..e07b9a31f0 100644 --- a/src/main/apiServer/utils/createStreamAbortController.ts +++ b/src/main/apiServer/utils/createStreamAbortController.ts @@ -4,6 +4,7 @@ export interface StreamAbortController { abortController: AbortController registerAbortHandler: (handler: StreamAbortHandler) => void clearAbortTimeout: () => void + dispose: () => void } export const STREAM_TIMEOUT_REASON = 'stream timeout' @@ -40,6 +41,15 @@ export const createStreamAbortController = (options: CreateStreamAbortController signal.addEventListener('abort', handleAbort, { once: true }) + let disposed = false + + const dispose = () => { + if (disposed) return + disposed = true + clearAbortTimeout() + signal.removeEventListener('abort', handleAbort) + } + const registerAbortHandler = (handler: StreamAbortHandler) => { abortHandler = handler @@ -59,6 +69,7 @@ export const createStreamAbortController = (options: CreateStreamAbortController return { abortController, registerAbortHandler, - clearAbortTimeout + clearAbortTimeout, + dispose } } From 3b44392e5aa81e985bb4f88e5ec4c5b21af81bde Mon Sep 17 00:00:00 2001 From: suyao Date: Sun, 4 Jan 2026 18:10:39 +0800 Subject: [PATCH 2/4] feat: implement abort signal handling for streaming responses --- .../apiServer/services/ProxyStreamService.ts | 36 ++++++++++++++++--- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/src/main/apiServer/services/ProxyStreamService.ts b/src/main/apiServer/services/ProxyStreamService.ts index 5519dc49ef..90149a617e 100644 --- a/src/main/apiServer/services/ProxyStreamService.ts +++ b/src/main/apiServer/services/ProxyStreamService.ts @@ -33,6 +33,8 @@ import type { Response } from 'express' import type { InputFormat, InputParamsMap, IStreamAdapter } from '../adapters' import { MessageConverterFactory, type OutputFormat, StreamAdapterFactory } from '../adapters' +import { LONG_POLL_TIMEOUT_MS } from '../config/timeouts' +import { createStreamAbortController } from '../utils/createStreamAbortController' import { googleReasoningCache, openRouterReasoningCache } from './reasoning-cache' const logger = loggerService.withContext('ProxyStreamService') @@ -96,6 +98,7 @@ interface ExecuteStreamConfig { outputFormat: OutputFormat middlewares?: LanguageModelMiddleware[] plugins?: AiPlugin[] + abortSignal?: AbortSignal } // ============================================================================ @@ -248,7 +251,7 @@ async function executeStream(config: ExecuteStreamConfig): Promise<{ adapter: IStreamAdapter outputStream: ReadableStream }> { - const { provider, modelId, params, inputFormat, outputFormat, middlewares = [], plugins = [] } = config + const { provider, modelId, params, inputFormat, outputFormat, middlewares = [], plugins = [], abortSignal } = config // Convert provider config to AI SDK config let sdkConfig = providerToAiSdkConfig(provider, modelId) @@ -291,7 +294,8 @@ async function executeStream(config: ExecuteStreamConfig): Promise<{ stopWhen: stepCountIs(100), headers: defaultAppHeaders(), tools, - providerOptions + providerOptions, + abortSignal }) // Transform stream using adapter @@ -344,6 +348,21 @@ export async function streamToResponse(config: StreamConfig): Promise { pluginCount: plugins.length }) + // Create abort controller for client disconnect handling + const streamController = createStreamAbortController({ + timeoutMs: LONG_POLL_TIMEOUT_MS + }) + const { abortController, dispose } = streamController + + // Handle client disconnect + const handleDisconnect = () => { + if (abortController.signal.aborted) return + logger.info('Client disconnected, aborting stream', { providerId: provider.id, modelId }) + abortController.abort('Client disconnected') + } + + response.on('close', handleDisconnect) + try { // Set SSE headers response.setHeader('Content-Type', 'text/event-stream') @@ -358,7 +377,8 @@ export async function streamToResponse(config: StreamConfig): Promise { inputFormat, outputFormat, middlewares, - plugins + plugins, + abortSignal: abortController.signal }) // Get formatter for the output format @@ -370,6 +390,7 @@ export async function streamToResponse(config: StreamConfig): Promise { while (true) { const { done, value } = await reader.read() if (done) break + if (response.writableEnded) break response.write(formatter.formatEvent(value)) } } finally { @@ -377,8 +398,10 @@ export async function streamToResponse(config: StreamConfig): Promise { } // Send done marker and end response - response.write(formatter.formatDone()) - response.end() + if (!response.writableEnded) { + response.write(formatter.formatDone()) + response.end() + } logger.info('Proxy stream completed', { providerId: provider.id, modelId }) onComplete?.() @@ -386,6 +409,9 @@ export async function streamToResponse(config: StreamConfig): Promise { logger.error('Error in proxy stream', error as Error, { providerId: provider.id, modelId }) onError?.(error) throw error + } finally { + response.off('close', handleDisconnect) + dispose() } } From b52afe075f8f279d90085275af932124936f3436 Mon Sep 17 00:00:00 2001 From: suyao Date: Sun, 4 Jan 2026 18:41:22 +0800 Subject: [PATCH 3/4] refactor: message processing to unify streaming and non-streaming handling --- src/main/apiServer/routes/chat.ts | 29 +-- src/main/apiServer/routes/messages.ts | 38 ++-- .../apiServer/services/ProxyStreamService.ts | 211 +++++------------- 3 files changed, 78 insertions(+), 200 deletions(-) diff --git a/src/main/apiServer/routes/chat.ts b/src/main/apiServer/routes/chat.ts index 999ad36312..4756075e9b 100644 --- a/src/main/apiServer/routes/chat.ts +++ b/src/main/apiServer/routes/chat.ts @@ -3,7 +3,7 @@ import express from 'express' import { loggerService } from '../../services/LoggerService' import type { ExtendedChatCompletionCreateParams } from '../adapters' -import { generateMessage, streamToResponse } from '../services/ProxyStreamService' +import { processMessage } from '../services/ProxyStreamService' import { validateModelId } from '../utils' const logger = loggerService.withContext('ApiServerChatRoutes') @@ -205,38 +205,15 @@ router.post('/completions', async (req: Request, res: Response) => { const provider = modelValidation.provider! const modelId = modelValidation.modelId! - const isStreaming = !!request.stream - if (isStreaming) { - try { - await streamToResponse({ - response: res, - provider, - modelId, - params: request, - inputFormat: 'openai', - outputFormat: 'openai' - }) - } catch (streamError) { - logger.error('Stream error', { error: streamError }) - // If headers weren't sent yet, return JSON error - if (!res.headersSent) { - const { status, body } = mapChatCompletionError(streamError) - return res.status(status).json(body) - } - // Otherwise the error is already handled by streamToResponse - } - return - } - - const response = await generateMessage({ + return processMessage({ + response: res, provider, modelId, params: request, inputFormat: 'openai', outputFormat: 'openai' }) - return res.json(response) } catch (error: unknown) { const { status, body } = mapChatCompletionError(error) return res.status(status).json(body) diff --git a/src/main/apiServer/routes/messages.ts b/src/main/apiServer/routes/messages.ts index 652d4f46bb..94171c6b3d 100644 --- a/src/main/apiServer/routes/messages.ts +++ b/src/main/apiServer/routes/messages.ts @@ -8,7 +8,7 @@ import express from 'express' import { approximateTokenSize } from 'tokenx' import { messagesService } from '../services/messages' -import { generateMessage, streamToResponse } from '../services/ProxyStreamService' +import { processMessage } from '../services/ProxyStreamService' import { getProviderById, isModelAnthropicCompatible, validateModelId } from '../utils' /** @@ -321,29 +321,19 @@ async function handleUnifiedProcessing({ providerId: provider.id }) - if (request.stream) { - await streamToResponse({ - response: res, - provider, - modelId: actualModelId, - params: request, - middlewares, - onError: (error) => { - logger.error('Stream error', error as Error) - }, - onComplete: () => { - logger.debug('Stream completed') - } - }) - } else { - const response = await generateMessage({ - provider, - modelId: actualModelId, - params: request, - middlewares - }) - res.json(response) - } + await processMessage({ + response: res, + provider, + modelId: actualModelId, + params: request, + middlewares, + onError: (error) => { + logger.error('Message error', error as Error) + }, + onComplete: () => { + logger.debug('Message completed') + } + }) } catch (error: any) { const { statusCode, errorResponse } = messagesService.transformError(error) res.status(statusCode).json(errorResponse) diff --git a/src/main/apiServer/services/ProxyStreamService.ts b/src/main/apiServer/services/ProxyStreamService.ts index 90149a617e..d5387c9f23 100644 --- a/src/main/apiServer/services/ProxyStreamService.ts +++ b/src/main/apiServer/services/ProxyStreamService.ts @@ -44,10 +44,6 @@ initializeSharedProviders({ error: (message, error) => logger.error(message, error) }) -// ============================================================================ -// Configuration Interfaces -// ============================================================================ - /** * Middleware type alias */ @@ -59,9 +55,9 @@ type LanguageModelMiddleware = LanguageModelV2Middleware type InputParams = InputParamsMap[InputFormat] /** - * Configuration for streaming message requests + * Configuration for message requests (both streaming and non-streaming) */ -export interface StreamConfig { +export interface MessageConfig { response: Response provider: Provider modelId: string @@ -74,19 +70,6 @@ export interface StreamConfig { plugins?: AiPlugin[] } -/** - * Configuration for non-streaming message generation - */ -export interface GenerateConfig { - provider: Provider - modelId: string - params: InputParams - inputFormat?: InputFormat - outputFormat?: OutputFormat - middlewares?: LanguageModelMiddleware[] - plugins?: AiPlugin[] -} - /** * Internal configuration for stream execution */ @@ -304,27 +287,14 @@ async function executeStream(config: ExecuteStreamConfig): Promise<{ return { adapter, outputStream } } -// ============================================================================ -// Public API -// ============================================================================ - /** - * Stream a message request and write to HTTP response + * Process a message request - handles both streaming and non-streaming * - * Uses TransformStream-based adapters for efficient streaming. - * - * @example - * ```typescript - * await streamToResponse({ - * response: res, - * provider, - * modelId: 'claude-3-opus', - * params: messageCreateParams, - * outputFormat: 'anthropic' - * }) - * ``` + * Automatically detects streaming mode from params.stream: + * - stream=true: SSE streaming response + * - stream=false: JSON response */ -export async function streamToResponse(config: StreamConfig): Promise { +export async function processMessage(config: MessageConfig): Promise { const { response, provider, @@ -338,7 +308,9 @@ export async function streamToResponse(config: StreamConfig): Promise { plugins = [] } = config - logger.info('Starting proxy stream', { + const isStreaming = 'stream' in params && params.stream === true + + logger.info(`Starting ${isStreaming ? 'streaming' : 'non-streaming'} message`, { providerId: provider.id, providerType: provider.type, modelId, @@ -348,112 +320,21 @@ export async function streamToResponse(config: StreamConfig): Promise { pluginCount: plugins.length }) - // Create abort controller for client disconnect handling - const streamController = createStreamAbortController({ - timeoutMs: LONG_POLL_TIMEOUT_MS - }) + // Create abort controller with timeout + const streamController = createStreamAbortController({ timeoutMs: LONG_POLL_TIMEOUT_MS }) const { abortController, dispose } = streamController - // Handle client disconnect const handleDisconnect = () => { if (abortController.signal.aborted) return - logger.info('Client disconnected, aborting stream', { providerId: provider.id, modelId }) + logger.info('Client disconnected, aborting', { providerId: provider.id, modelId }) abortController.abort('Client disconnected') } response.on('close', handleDisconnect) try { - // Set SSE headers - response.setHeader('Content-Type', 'text/event-stream') - response.setHeader('Cache-Control', 'no-cache') - response.setHeader('Connection', 'keep-alive') - response.setHeader('X-Accel-Buffering', 'no') - - const { outputStream } = await executeStream({ - provider, - modelId, - params, - inputFormat, - outputFormat, - middlewares, - plugins, - abortSignal: abortController.signal - }) - - // Get formatter for the output format - const formatter = StreamAdapterFactory.getFormatter(outputFormat) - - // Stream events to response - const reader = outputStream.getReader() - try { - while (true) { - const { done, value } = await reader.read() - if (done) break - if (response.writableEnded) break - response.write(formatter.formatEvent(value)) - } - } finally { - reader.releaseLock() - } - - // Send done marker and end response - if (!response.writableEnded) { - response.write(formatter.formatDone()) - response.end() - } - - logger.info('Proxy stream completed', { providerId: provider.id, modelId }) - onComplete?.() - } catch (error) { - logger.error('Error in proxy stream', error as Error, { providerId: provider.id, modelId }) - onError?.(error) - throw error - } finally { - response.off('close', handleDisconnect) - dispose() - } -} - -/** - * Generate a non-streaming message response - * - * Uses simulateStreamingMiddleware to reuse the same streaming logic. - * - * @example - * ```typescript - * const message = await generateMessage({ - * provider, - * modelId: 'claude-3-opus', - * params: messageCreateParams, - * outputFormat: 'anthropic' - * }) - * ``` - */ -export async function generateMessage(config: GenerateConfig): Promise { - const { - provider, - modelId, - params, - inputFormat = 'anthropic', - outputFormat = 'anthropic', - middlewares = [], - plugins = [] - } = config - - logger.info('Starting message generation', { - providerId: provider.id, - providerType: provider.type, - modelId, - inputFormat, - outputFormat, - middlewareCount: middlewares.length, - pluginCount: plugins.length - }) - - try { - // Add simulateStreamingMiddleware to reuse streaming logic - const allMiddlewares = [simulateStreamingMiddleware(), ...middlewares] + // For non-streaming, add simulateStreamingMiddleware + const allMiddlewares = isStreaming ? middlewares : [simulateStreamingMiddleware(), ...middlewares] const { adapter, outputStream } = await executeStream({ provider, @@ -462,30 +343,60 @@ export async function generateMessage(config: GenerateConfig): Promise inputFormat, outputFormat, middlewares: allMiddlewares, - plugins + plugins, + abortSignal: abortController.signal }) - // Consume the stream to populate adapter state - const reader = outputStream.getReader() - while (true) { - const { done } = await reader.read() - if (done) break + if (isStreaming) { + // Streaming: Set SSE headers and stream events + response.setHeader('Content-Type', 'text/event-stream') + response.setHeader('Cache-Control', 'no-cache') + response.setHeader('Connection', 'keep-alive') + response.setHeader('X-Accel-Buffering', 'no') + + const formatter = StreamAdapterFactory.getFormatter(outputFormat) + const reader = outputStream.getReader() + + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + if (response.writableEnded) break + response.write(formatter.formatEvent(value)) + } + } finally { + reader.releaseLock() + } + + if (!response.writableEnded) { + response.write(formatter.formatDone()) + response.end() + } + } else { + // Non-streaming: Consume stream and return JSON + const reader = outputStream.getReader() + while (true) { + const { done } = await reader.read() + if (done) break + } + reader.releaseLock() + + const finalResponse = adapter.buildNonStreamingResponse() + response.json(finalResponse) } - reader.releaseLock() - // Build final response from adapter - const finalResponse = adapter.buildNonStreamingResponse() - - logger.info('Message generation completed', { providerId: provider.id, modelId }) - - return finalResponse + logger.info('Message completed', { providerId: provider.id, modelId, streaming: isStreaming }) + onComplete?.() } catch (error) { - logger.error('Error in message generation', error as Error, { providerId: provider.id, modelId }) + logger.error('Error in message processing', error as Error, { providerId: provider.id, modelId }) + onError?.(error) throw error + } finally { + response.off('close', handleDisconnect) + dispose() } } export default { - streamToResponse, - generateMessage + processMessage } From 06d4e286b3df845516c2a0374b7ec5bb3f07d2ee Mon Sep 17 00:00:00 2001 From: suyao Date: Sun, 4 Jan 2026 18:55:22 +0800 Subject: [PATCH 4/4] chore: remove outdated comments regarding providerType handling --- src/main/apiServer/services/models.ts | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/main/apiServer/services/models.ts b/src/main/apiServer/services/models.ts index b72c21b1e1..a736292c3a 100644 --- a/src/main/apiServer/services/models.ts +++ b/src/main/apiServer/services/models.ts @@ -15,11 +15,6 @@ export class ModelsService { const providers = await getAvailableProviders() - // Note: When providerType === 'anthropic', we now return ALL available models - // because the API Server's unified adapter (AiSdkToAnthropicSSE) can convert - // any provider's response to Anthropic SSE format. This enables Claude Code Agent - // to work with OpenAI, Gemini, and other providers transparently. - const models = await listAllAvailableModels(providers) // Use Map to deduplicate models by their full ID (provider:model_id) const uniqueModels = new Map()