From 6f523f31cc14846030555691fa23a60c4eb5d7d8 Mon Sep 17 00:00:00 2001 From: akitasummer Date: Tue, 7 Apr 2026 10:56:10 +0800 Subject: [PATCH] fix: mcp stream timeout --- tegg/plugin/mcp-proxy/src/index.ts | 34 ++++++++++++++++++------------ 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/tegg/plugin/mcp-proxy/src/index.ts b/tegg/plugin/mcp-proxy/src/index.ts index a89d29c1c0..8c07231172 100644 --- a/tegg/plugin/mcp-proxy/src/index.ts +++ b/tegg/plugin/mcp-proxy/src/index.ts @@ -3,6 +3,7 @@ import cluster from 'node:cluster'; import querystring from 'node:querystring'; import { Readable } from 'node:stream'; import url from 'node:url'; +import { fetch, Agent } from 'undici'; import { MCPControllerRegister } from '@eggjs/controller-plugin/lib/impl/mcp/MCPControllerRegister'; import type { MCPControllerHook } from '@eggjs/controller-plugin/lib/impl/mcp/MCPControllerRegister'; @@ -343,11 +344,10 @@ export class MCPProxyApiClient extends APIClientBase { ctx.req.headers['mcp-proxy-type'] = action; ctx.req.headers['mcp-proxy-sessionid'] = sessionId; const resp = await fetch(`http://localhost:${detail.port}/mcp/message?sessionId=${sessionId}`, { - // dispatcher: new Agent({ - // connect: { - // socketPath, - // }, - // }), + dispatcher: new Agent({ + bodyTimeout: 0, + headersTimeout: 0, + }), headers: ctx.req.headers as unknown as Record, body: body as string, method: ctx.req.method, @@ -389,11 +389,10 @@ export class MCPProxyApiClient extends APIClientBase { ctx.req.headers['mcp-proxy-type'] = action; ctx.req.headers['mcp-proxy-sessionid'] = sessionId; const response = await fetch(`http://localhost:${detail.port}`, { - // dispatcher: new Agent({ - // connect: { - // socketPath, - // }, - // }), + dispatcher: new Agent({ + bodyTimeout: 0, + headersTimeout: 0, + }), headers: ctx.req.headers as unknown as Record, method: ctx.req.method, ...(ctx.req.method !== 'GET' @@ -413,7 +412,14 @@ export class MCPProxyApiClient extends APIClientBase { } ctx.set(headers); ctx.res.statusCode = response.status; - Readable.fromWeb(response.body! as any).pipe(ctx.res); + const readable = Readable.fromWeb(response.body!); + readable.on('error', err => { + this.logger.error('[mcp-proxy] stream proxy error: %s', err.message); + if (!ctx.res.writableEnded) { + ctx.res.end(); + } + }); + readable.pipe(ctx.res); break; } } @@ -464,8 +470,10 @@ export class MCPProxyApiClient extends APIClientBase { ctx.res.write('event: terminate'); } catch (error) { ctx.res.statusCode = 500; - ctx.res.write(`see stream error ${error}`); - ctx.res.end(); + if (!ctx.res.writableEnded) { + ctx.res.statusCode = 500; + ctx.res.end(); + } } }; processStream();