diff --git a/src/main/apiServer/routes/agents/handlers/messages.ts b/src/main/apiServer/routes/agents/handlers/messages.ts index bd1ef5bc42..41a3bf44d6 100644 --- a/src/main/apiServer/routes/agents/handlers/messages.ts +++ b/src/main/apiServer/routes/agents/handlers/messages.ts @@ -1,4 +1,5 @@ import { loggerService } from '@logger' +import { AgentStreamEvent } from '@main/services/agents/interfaces/AgentStreamInterface' import { Request, Response } from 'express' import { agentService, sessionMessageService, sessionService } from '../../../../services/agents' @@ -42,13 +43,14 @@ export const createMessage = async (req: Request, res: Response): Promise res.setHeader('Access-Control-Allow-Origin', '*') res.setHeader('Access-Control-Allow-Headers', 'Cache-Control') - const messageStream = sessionMessageService.createSessionMessage(session, messageData) + const abortController = new AbortController() + const messageStream = sessionMessageService.createSessionMessage(session, messageData, abortController) // Track stream lifecycle so we keep the SSE connection open until persistence finishes let responseEnded = false let streamFinished = false let awaitingPersistence = false - let persistenceResolved = false + const persistenceResolved = false const finalizeResponse = () => { if (responseEnded) { @@ -73,15 +75,39 @@ export const createMessage = async (req: Request, res: Response): Promise res.end() } - // Handle client disconnect - req.on('close', () => { + /** + * Client Disconnect Detection for Server-Sent Events (SSE) + * + * We monitor multiple HTTP events to reliably detect when a client disconnects + * from the streaming response. This is crucial for: + * - Aborting long-running Claude Code processes + * - Cleaning up resources and preventing memory leaks + * - Avoiding orphaned processes + * + * Event Priority & Behavior: + * 1. res.on('close') - Most common for SSE client disconnects (browser tab close, curl Ctrl+C) + * 2. req.on('aborted') - Explicit request abortion + * 3. req.on('close') - Request object closure (less common with SSE) + * + * When any disconnect event fires, we: + * - Abort the Claude Code SDK process via abortController + * - Clean up event listeners to prevent memory leaks + * - Mark the response as ended to prevent further writes + */ + const handleDisconnect = () => { + if (responseEnded) return logger.info(`Client disconnected from streaming message for session: ${sessionId}`) responseEnded = true messageStream.removeAllListeners() - }) + abortController.abort('Client disconnected') + } + + req.on('close', handleDisconnect) + req.on('aborted', handleDisconnect) + res.on('close', handleDisconnect) // Handle stream events - messageStream.on('data', (event: any) => { + messageStream.on('data', (event: AgentStreamEvent) => { if (responseEnded) return try { @@ -101,12 +127,6 @@ export const createMessage = async (req: Request, res: Response): Promise logger.error(`Streaming message error for session: ${sessionId}:`, event.error) streamFinished = true - awaitingPersistence = Boolean(event.persistScheduled) - - if (!awaitingPersistence) { - persistenceResolved = true - } - finalizeResponse() break } @@ -121,23 +141,13 @@ export const createMessage = async (req: Request, res: Response): Promise break } - case 'persisted': - // Send persistence success event - // res.write(`data: ${JSON.stringify(event)}\n\n`) - logger.debug(`Session message persisted for session: ${sessionId}`, { messageId: event.message?.id }) - - persistenceResolved = true - finalizeResponse() - break - - case 'persist-error': - // Send persistence error event - // res.write(`data: ${JSON.stringify(event)}\n\n`) - logger.error(`Failed to persist session message for session: ${sessionId}:`, event.error) - - persistenceResolved = true + case 'cancelled': { + logger.info(`Streaming message cancelled for session: ${sessionId}`) + // res.write(`data: ${JSON.stringify({ type: 'cancelled' })}\n\n`) + streamFinished = true finalizeResponse() break + } default: // Handle other event types as generic data @@ -199,8 +209,8 @@ export const createMessage = async (req: Request, res: Response): Promise res.end() } }, - 5 * 60 * 1000 - ) // 5 minutes timeout + 10 * 60 * 1000 + ) // 10 minutes timeout // Clear timeout when response ends res.on('close', () => clearTimeout(timeout)) diff --git a/src/main/services/agents/interfaces/AgentStreamInterface.ts b/src/main/services/agents/interfaces/AgentStreamInterface.ts index 2cb82c4942..245224248a 100644 --- a/src/main/services/agents/interfaces/AgentStreamInterface.ts +++ b/src/main/services/agents/interfaces/AgentStreamInterface.ts @@ -8,11 +8,9 @@ import { UIMessageChunk } from 'ai' // Generic agent stream event that works with any agent type export interface AgentStreamEvent { - type: 'chunk' | 'error' | 'complete' + type: 'chunk' | 'error' | 'complete' | 'cancelled' chunk?: UIMessageChunk // Standard AI SDK chunk for UI consumption - rawAgentMessage?: any // Agent-specific raw message (SDKMessage for Claude Code, different for other agents) error?: Error - agentResult?: any // Agent-specific result data } // Agent stream interface that all agents should implement @@ -24,5 +22,10 @@ export interface AgentStream extends EventEmitter { // Base agent service interface export interface AgentServiceInterface { - invoke(prompt: string, session: GetAgentSessionResponse, lastAgentSessionId?: string): Promise + invoke( + prompt: string, + session: GetAgentSessionResponse, + abortController: AbortController, + lastAgentSessionId?: string + ): Promise } diff --git a/src/main/services/agents/services/SessionMessageService.ts b/src/main/services/agents/services/SessionMessageService.ts index f83bff5756..e50c4e8ffc 100644 --- a/src/main/services/agents/services/SessionMessageService.ts +++ b/src/main/services/agents/services/SessionMessageService.ts @@ -170,14 +170,18 @@ export class SessionMessageService extends BaseService { return { messages } } - createSessionMessage(session: GetAgentSessionResponse, messageData: CreateSessionMessageRequest): EventEmitter { + createSessionMessage( + session: GetAgentSessionResponse, + messageData: CreateSessionMessageRequest, + abortController: AbortController + ): EventEmitter { this.ensureInitialized() // Create a new EventEmitter to manage the session message lifecycle const sessionStream = new EventEmitter() // No parent validation needed, start immediately - this.startSessionMessageStream(session, messageData, sessionStream) + this.startSessionMessageStream(session, messageData, sessionStream, abortController) return sessionStream } @@ -185,7 +189,8 @@ export class SessionMessageService extends BaseService { private async startSessionMessageStream( session: GetAgentSessionResponse, req: CreateSessionMessageRequest, - sessionStream: EventEmitter + sessionStream: EventEmitter, + abortController: AbortController ): Promise { const agentSessionId = await this.getLastAgentSessionId(session.id) let newAgentSessionId = '' @@ -198,7 +203,7 @@ export class SessionMessageService extends BaseService { } // Create the streaming agent invocation (using invokeStream for streaming) - const claudeStream = await this.cc.invoke(req.content, session, agentSessionId) + const claudeStream = await this.cc.invoke(req.content, session, abortController, agentSessionId) // Use chunk accumulator to manage streaming data const accumulator = new ChunkAccumulator() diff --git a/src/main/services/agents/services/claudecode/index.ts b/src/main/services/agents/services/claudecode/index.ts index 7e76c5417f..fe45f7edd3 100644 --- a/src/main/services/agents/services/claudecode/index.ts +++ b/src/main/services/agents/services/claudecode/index.ts @@ -14,15 +14,6 @@ import { transformSDKMessageToUIChunk } from './transform' const require_ = createRequire(import.meta.url) const logger = loggerService.withContext('ClaudeCodeService') -interface ClaudeCodeResult { - success: boolean - stdout: string - stderr: string - jsonOutput: any[] - error?: Error - exitCode?: number -} - class ClaudeCodeStream extends EventEmitter implements AgentStream { declare emit: (event: 'data', data: AgentStreamEvent) => boolean declare on: (event: 'data', listener: (data: AgentStreamEvent) => void) => this @@ -37,7 +28,12 @@ class ClaudeCodeService implements AgentServiceInterface { this.claudeExecutablePath = require_.resolve('@anthropic-ai/claude-code/cli.js') } - async invoke(prompt: string, session: GetAgentSessionResponse, lastAgentSessionId?: string): Promise { + async invoke( + prompt: string, + session: GetAgentSessionResponse, + abortController: AbortController, + lastAgentSessionId?: string + ): Promise { const aiStream = new ClaudeCodeStream() // Validate session accessible paths and make sure it exists as a directory @@ -76,6 +72,7 @@ class ClaudeCodeService implements AgentServiceInterface { // Build SDK options from parameters const options: Options = { + abortController, cwd, pathToClaudeCodeExecutable: this.claudeExecutablePath, stderr: (chunk: string) => { @@ -164,8 +161,7 @@ class ClaudeCodeService implements AgentServiceInterface { for (const chunk of chunks) { stream.emit('data', { type: 'chunk', - chunk, - rawAgentMessage: message + chunk }) } } @@ -179,57 +175,44 @@ class ClaudeCodeService implements AgentServiceInterface { messageCount: jsonOutput.length }) - const result: ClaudeCodeResult = { - success: true, - stdout: '', - stderr: '', - jsonOutput, - exitCode: 0 - } - // Emit completion event stream.emit('data', { - type: 'complete', - agentResult: { - ...result, - rawSDKMessages: jsonOutput, - agentType: 'claude-code' - } + type: 'complete' }) } catch (error) { if (hasCompleted) return hasCompleted = true const duration = Date.now() - startTime + + // Check if this is an abort error + const errorObj = error as any + const isAborted = + errorObj?.name === 'AbortError' || + errorObj?.message?.includes('aborted') || + options.abortController?.signal.aborted + + if (isAborted) { + logger.info('SDK query aborted by client disconnect', { duration }) + // Simply cleanup and return - don't emit error events + stream.emit('data', { + type: 'cancelled', + error: new Error('Request aborted by client') + }) + return + } + + // Original error handling for non-abort errors logger.error('SDK query error:', { - error: error instanceof Error ? error.message : String(error), + error: errorObj instanceof Error ? errorObj.message : String(errorObj), duration, messageCount: jsonOutput.length }) - const result: ClaudeCodeResult = { - success: false, - stdout: '', - stderr: error instanceof Error ? error.message : String(error), - jsonOutput, - error: error instanceof Error ? error : new Error(String(error)), - exitCode: 1 - } - // Emit error event stream.emit('data', { type: 'error', - error: error instanceof Error ? error : new Error(String(error)) - }) - - // Emit completion with error result - stream.emit('data', { - type: 'complete', - agentResult: { - ...result, - rawSDKMessages: jsonOutput, - agentType: 'claude-code' - } + error: errorObj instanceof Error ? errorObj : new Error(String(errorObj)) }) } }