Conversation
📝 WalkthroughWalkthroughModified stream proxying and error handling in the MCP proxy service. Added Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Deploying egg-v3 with
|
| Latest commit: |
6f523f3
|
| Status: | ✅ Deploy successful! |
| Preview URL: | https://fa38f566.egg-v3.pages.dev |
| Branch Preview URL: | https://fix-stream-timeout.egg-v3.pages.dev |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## next #5860 +/- ##
=======================================
Coverage 85.28% 85.28%
=======================================
Files 666 666
Lines 13247 13247
Branches 1538 1538
=======================================
Hits 11298 11298
Misses 1818 1818
Partials 131 131 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Code Review
This pull request replaces the default fetch with undici, configures Agent timeouts, and enhances error handling for proxied streams. The review feedback recommends optimizing performance by reusing a single Agent instance for connection pooling, adding safety checks for response.body to prevent runtime errors, and refining SSE error handling to avoid redundant status code assignments and potential header conflicts.
| import querystring from 'node:querystring'; | ||
| import { Readable } from 'node:stream'; | ||
| import url from 'node:url'; | ||
| import { fetch, Agent } from 'undici'; |
There was a problem hiding this comment.
To improve performance and enable connection pooling, it is recommended to create a single Agent instance and reuse it across requests instead of instantiating a new one for every fetch call.
| import { fetch, Agent } from 'undici'; | |
| import { fetch, Agent } from 'undici'; | |
| const proxyAgent = new Agent({ | |
| bodyTimeout: 0, | |
| headersTimeout: 0, | |
| }); |
| dispatcher: new Agent({ | ||
| bodyTimeout: 0, | ||
| headersTimeout: 0, | ||
| }), |
| dispatcher: new Agent({ | ||
| bodyTimeout: 0, | ||
| headersTimeout: 0, | ||
| }), |
| ctx.set(headers); | ||
| ctx.res.statusCode = response.status; | ||
| Readable.fromWeb(response.body! as any).pipe(ctx.res); | ||
| const readable = Readable.fromWeb(response.body!); |
There was a problem hiding this comment.
The use of the non-null assertion operator (!) on response.body could lead to a runtime error if the response has no body (e.g., a 204 No Content response). It is safer to check if the body exists before attempting to create a readable stream from it.
if (!response.body) {
ctx.res.end();
break;
}
const readable = Readable.fromWeb(response.body);| ctx.res.statusCode = 500; | ||
| ctx.res.write(`see stream error ${error}`); | ||
| ctx.res.end(); | ||
| if (!ctx.res.writableEnded) { | ||
| ctx.res.statusCode = 500; | ||
| ctx.res.end(); | ||
| } |
There was a problem hiding this comment.
The status code is being set redundantly. Furthermore, attempting to set the status code after headers have been sent (which is likely in a streaming response) will result in errors or warnings. It is also good practice to log the error for observability.
this.logger.error('[mcp-proxy] sse stream error: %s', error);
if (!ctx.res.headersSent) {
ctx.res.statusCode = 500;
}
if (!ctx.res.writableEnded) {
ctx.res.end();
}There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
tegg/plugin/mcp-proxy/src/index.ts (1)
347-354: Consider reusing a singleAgentinstance instead of creating one per request.Creating a new
Agentfor every fetch call is inefficient.undiciAgents manage connection pools and are designed to be reused. Under load, this pattern may lead to unnecessary resource allocation and connection churn.♻️ Proposed fix: Create a shared Agent at class or module level
Add a module-level constant or class property:
+const mcpProxyAgent = new Agent({ + bodyTimeout: 0, + headersTimeout: 0, +}); + export class MCPProxyApiClient extends APIClientBase {Then reuse it in fetch calls:
const resp = await fetch(`http://localhost:${detail.port}/mcp/message?sessionId=${sessionId}`, { - dispatcher: new Agent({ - bodyTimeout: 0, - headersTimeout: 0, - }), + dispatcher: mcpProxyAgent, headers: ctx.req.headers as unknown as Record<string, string>,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tegg/plugin/mcp-proxy/src/index.ts` around lines 347 - 354, The code currently creates a new undici Agent for each fetch call by instantiating Agent in the dispatcher option; instead, declare a single shared Agent (e.g., a module-level or class-level constant named agent or sharedAgent) and reuse it for all requests. Update the fetch/options creation (where dispatcher: new Agent({ bodyTimeout: 0, headersTimeout: 0 })) to reference that shared Agent instance and ensure its lifetime covers the process so connection pooling is preserved; keep the existing timeout options when initializing the shared Agent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tegg/plugin/mcp-proxy/src/index.ts`:
- Around line 471-477: The catch block sets ctx.res.statusCode = 500 twice and
may set status on a closed response; remove the first unconditional assignment
and move status setting inside the writableEnded guard so only when
!ctx.res.writableEnded you set ctx.res.statusCode = 500 and then call
ctx.res.end(); update the catch block around the existing ctx and res usage to
only change statusCode when the response is writable (reference the catch block
handling ctx.res and the writableEnded check).
- Around line 415-422: The code uses Readable.fromWeb(response.body!) which will
throw if response.body is null; update the proxy logic in the handler where
Readable.fromWeb is called to first check if response.body is null/undefined
(and also check response.status === 204/no-content), and if so skip creating the
Readable, set appropriate headers/status on ctx.res and call ctx.res.end();
otherwise create the Readable from response.body, attach the 'error' handler and
pipe to ctx.res as before (referencing response, Readable.fromWeb, readable and
ctx.res in the same block).
---
Nitpick comments:
In `@tegg/plugin/mcp-proxy/src/index.ts`:
- Around line 347-354: The code currently creates a new undici Agent for each
fetch call by instantiating Agent in the dispatcher option; instead, declare a
single shared Agent (e.g., a module-level or class-level constant named agent or
sharedAgent) and reuse it for all requests. Update the fetch/options creation
(where dispatcher: new Agent({ bodyTimeout: 0, headersTimeout: 0 })) to
reference that shared Agent instance and ensure its lifetime covers the process
so connection pooling is preserved; keep the existing timeout options when
initializing the shared Agent.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 1b055c66-e75f-43b0-9967-a746a07405fd
📒 Files selected for processing (1)
tegg/plugin/mcp-proxy/src/index.ts
| const readable = Readable.fromWeb(response.body!); | ||
| readable.on('error', err => { | ||
| this.logger.error('[mcp-proxy] stream proxy error: %s', err.message); | ||
| if (!ctx.res.writableEnded) { | ||
| ctx.res.end(); | ||
| } | ||
| }); | ||
| readable.pipe(ctx.res); |
There was a problem hiding this comment.
Guard against null response body before converting to Readable.
The non-null assertion response.body! is unsafe. If the upstream server returns a response without a body (e.g., certain error responses or 204 No Content), response.body will be null and Readable.fromWeb(null) will throw a runtime error.
🛡️ Proposed fix: Add null check
ctx.res.statusCode = response.status;
+ if (!response.body) {
+ ctx.res.end();
+ break;
+ }
const readable = Readable.fromWeb(response.body!);
readable.on('error', err => {
this.logger.error('[mcp-proxy] stream proxy error: %s', err.message);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tegg/plugin/mcp-proxy/src/index.ts` around lines 415 - 422, The code uses
Readable.fromWeb(response.body!) which will throw if response.body is null;
update the proxy logic in the handler where Readable.fromWeb is called to first
check if response.body is null/undefined (and also check response.status ===
204/no-content), and if so skip creating the Readable, set appropriate
headers/status on ctx.res and call ctx.res.end(); otherwise create the Readable
from response.body, attach the 'error' handler and pipe to ctx.res as before
(referencing response, Readable.fromWeb, readable and ctx.res in the same
block).
| } catch (error) { | ||
| ctx.res.statusCode = 500; | ||
| ctx.res.write(`see stream error ${error}`); | ||
| ctx.res.end(); | ||
| if (!ctx.res.writableEnded) { | ||
| ctx.res.statusCode = 500; | ||
| ctx.res.end(); | ||
| } | ||
| } |
There was a problem hiding this comment.
Remove redundant statusCode assignment.
ctx.res.statusCode = 500 is set twice: unconditionally on line 472 and again inside the if block on line 474. If the response is already ended (writableEnded is true), setting statusCode on line 472 has no effect and may even throw in some Node.js versions. Move the assignment inside the guard.
🧹 Proposed fix
} catch (error) {
- ctx.res.statusCode = 500;
if (!ctx.res.writableEnded) {
ctx.res.statusCode = 500;
ctx.res.end();
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| } catch (error) { | |
| ctx.res.statusCode = 500; | |
| ctx.res.write(`see stream error ${error}`); | |
| ctx.res.end(); | |
| if (!ctx.res.writableEnded) { | |
| ctx.res.statusCode = 500; | |
| ctx.res.end(); | |
| } | |
| } | |
| } catch (error) { | |
| if (!ctx.res.writableEnded) { | |
| ctx.res.statusCode = 500; | |
| ctx.res.end(); | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tegg/plugin/mcp-proxy/src/index.ts` around lines 471 - 477, The catch block
sets ctx.res.statusCode = 500 twice and may set status on a closed response;
remove the first unconditional assignment and move status setting inside the
writableEnded guard so only when !ctx.res.writableEnded you set
ctx.res.statusCode = 500 and then call ctx.res.end(); update the catch block
around the existing ctx and res usage to only change statusCode when the
response is writable (reference the catch block handling ctx.res and the
writableEnded check).
Summary by CodeRabbit