Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
3c528be
增加采样功能
walterlv Apr 3, 2026
57c1d0b
简化代码,添加采样功能的单元测试
walterlv Apr 3, 2026
c1f4f2f
添加人工测试的 MCP 采样工具
walterlv Apr 3, 2026
27efe1b
解决采样协议不被客户端认识的问题
walterlv Apr 3, 2026
c432af1
整理采样异常
walterlv Apr 3, 2026
e56fb0a
调整一些代码
walterlv Apr 7, 2026
b069d80
处理审查
walterlv Apr 7, 2026
e2c67fe
处理审查
walterlv Apr 7, 2026
da9028c
整理传输层读取请求或响应的分支
walterlv Apr 7, 2026
3db477f
避免不应该的双语注释
walterlv Apr 7, 2026
694a602
添加更多的调试日志
walterlv Apr 8, 2026
9acc4e6
兼容 MCP Inspector 的采样拒绝
walterlv Apr 8, 2026
3010535
记录客户端错误
walterlv Apr 8, 2026
e2eba44
修复指令中的协议版本
walterlv Apr 8, 2026
8dceafe
AI 增加的屎山代码:修复 SSE 消息流向
walterlv Apr 8, 2026
733ca8c
简化传输层的实现
walterlv Apr 8, 2026
c6292f6
处理遗留的双语问题
walterlv Apr 8, 2026
10002e6
重构以简化传输层的代码
walterlv Apr 8, 2026
128aa76
更新文档记录
walterlv Apr 8, 2026
5963987
处理审查意见
walterlv Apr 8, 2026
52670d8
处理审查意见
walterlv Apr 8, 2026
4b18f12
处理审查意见
walterlv Apr 8, 2026
3933fbe
处理审查意见
walterlv Apr 8, 2026
19a5117
处理审查意见:修复并发竞态、重复请求ID、SSE心跳、以及初始化请求分类等问题
Copilot Apr 8, 2026
c96610f
调整心跳时间
walterlv Apr 8, 2026
9888474
处理审查意见:协议版本运算符比较、SSE并发写互斥、重复请求ID、双语注释、ToArray优化
Copilot Apr 8, 2026
11e92fb
处理审查意见
walterlv Apr 8, 2026
a855395
修复 ServerTransportManager:PipeReader 资源释放和大小写不敏感属性查找
Copilot Apr 9, 2026
00bed63
使用更简单的 JsonElement,不需要 PipeReader
walterlv Apr 9, 2026
71549d1
我觉得没必要为了这个不规范的写法浪费性能
walterlv Apr 9, 2026
52f6e76
处理审查意见
walterlv Apr 9, 2026
0710443
添加辅助传输层日志记录的类(避免日志打太多)
walterlv Apr 9, 2026
9b5cd10
统一使用受管理的传输层原始日志记录,避免日志大幅影响控制台输出
walterlv Apr 9, 2026
bbb20b0
尝试消除竞态条件
walterlv Apr 9, 2026
3ba6c05
为 GET 添加日志
walterlv Apr 9, 2026
6032afa
记录更详细的原始日志
walterlv Apr 10, 2026
b2f9f65
ToString 可以输出文本内容
walterlv Apr 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace DotNetCampus.SampleMcpServer.McpTools;

public class SampleTool
public class EchoDelayTool
{
/// <summary>
/// 用于给 AI 调试使用的工具,原样返回一些信息
Expand Down
57 changes: 57 additions & 0 deletions samples/DotNetCampus.SampleMcpServer/McpTools/SamplingTool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using DotNetCampus.ModelContextProtocol.CompilerServices;
using DotNetCampus.ModelContextProtocol.Exceptions;
using DotNetCampus.ModelContextProtocol.Protocol.Messages;
using DotNetCampus.ModelContextProtocol.Servers;

namespace DotNetCampus.SampleMcpServer.McpTools;

public class SamplingTool
{
/// <summary>
/// 通过客户端的 LLM 进行采样,将 prompt 发送给客户端,获取 LLM 响应并返回。
/// 用于人工验证 sampling/createMessage 协议流程是否正常。
/// </summary>
/// <param name="prompt">发送给 LLM 的提示词</param>
/// <param name="maxTokens">最大生成令牌数</param>
/// <param name="systemPrompt">可选的系统提示词</param>
/// <param name="context">MCP 工具上下文</param>
[McpServerTool]
public async Task<CallToolResult> AskLlm(
string prompt,
int maxTokens = 1024,
string? systemPrompt = null,
IMcpServerCallToolContext context = null!)
Comment thread
walterlv marked this conversation as resolved.
Outdated
{
if (!context.Sampling.IsSupported)
{
return CallToolResult.FromError(
"当前客户端未声明 Sampling 能力。请确保客户端支持 sampling/createMessage 请求。\n" +
"The connected client has not declared Sampling capability.");
}

try
{
var result = await context.Sampling.CreateMessageAsync(prompt, maxTokens, systemPrompt, context.CancellationToken);

var responseText = result.Content switch
{
TextContentBlock text => text.Text,
_ => $"[Non-text content: {result.Content?.GetType().Name}]",
};

return $"""
Model: {result.Model}
StopReason: {result.StopReason ?? "unknown"}
Role: {result.Role}
---
{responseText}
""";
}
catch (McpSamplingRejectedException ex)
{
return CallToolResult.FromError(
$"采样请求被用户拒绝。Sampling request was rejected by the user.\n" +
$"Code: {ex.ErrorCode}, Message: {ex.RejectionMessage}");
}
}
}
3 changes: 2 additions & 1 deletion samples/DotNetCampus.SampleMcpServer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ private static async Task Main(string[] args)
.WithRequestHandlers(s => new CustomRequestHandlers(s))
.WithJsonSerializer(McpToolJsonContext.Default)
.WithTools(t => t
.WithTool(() => new SampleTool())
.WithTool(() => new EchoDelayTool())
.WithTool(() => new InputTool())
.WithTool(() => new OutputTool())
.WithTool(() => new PolymorphicTool())
.WithTool(() => new ResourceTool())
.WithTool(() => new SamplingTool())
)
.WithResources(r => r
.WithResource(() => new SampleResource())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using dotnetCampus.Ipc.Pipes;
using System.Collections.Concurrent;
using dotnetCampus.Ipc.Pipes;
using DotNetCampus.ModelContextProtocol.Protocol.Messages;
using DotNetCampus.ModelContextProtocol.Protocol.Messages.JsonRpc;

namespace DotNetCampus.ModelContextProtocol.Transports.Ipc;
Expand All @@ -8,6 +10,9 @@ namespace DotNetCampus.ModelContextProtocol.Transports.Ipc;
/// </summary>
public class IpcServerTransportSession : IServerTransportSession
{
private readonly ConcurrentDictionary<string, TaskCompletionSource<JsonRpcResponse>> _pendingRequests = [];
private PeerProxy? _peer;

/// <summary>
/// 创建 DotNetCampus.Ipc 传输层的一个会话。
/// </summary>
Expand All @@ -22,15 +27,75 @@ public IpcServerTransportSession(string sessionId)
/// </summary>
public string SessionId { get; }

/// <inheritdoc />
public ClientCapabilities? ConnectedClientCapabilities { get; set; }

/// <summary>
/// 设置与此会话关联的 IPC 对端代理,用于 SendRequestAsync 发送消息。
/// </summary>
internal void SetPeer(PeerProxy peer)
{
_peer = peer;
}
Comment thread
walterlv marked this conversation as resolved.

/// <inheritdoc />
public Task SendMessageAsync(JsonRpcMessage message, CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}

/// <inheritdoc />
public async Task<JsonRpcResponse> SendRequestAsync(JsonRpcRequest request, CancellationToken cancellationToken = default)
{
if (request.Id?.ToString() is not { } id)
{
throw new InvalidOperationException("请求 ID 不能为 null。Request ID must not be null.");
}

var tcs = new TaskCompletionSource<JsonRpcResponse>(TaskCreationOptions.RunContinuationsAsynchronously);
_pendingRequests[id] = tcs;

using var registration = cancellationToken.Register(() =>
{
if (_pendingRequests.TryRemove(id, out var removed))
{
removed.TrySetCanceled(cancellationToken);
}
});

try
{
await SendMessageAsync(request, cancellationToken).ConfigureAwait(false);
return await tcs.Task.ConfigureAwait(false);
}
finally
{
_pendingRequests.TryRemove(id, out _);
}
Comment thread
walterlv marked this conversation as resolved.
Outdated
}

/// <inheritdoc />
public void HandleResponseAsync(JsonRpcResponse response)
{
if (response.Id?.ToString() is not { } id)
{
return;
}

if (_pendingRequests.TryRemove(id, out var tcs))
{
tcs.TrySetResult(response);
}
}

/// <inheritdoc />
public ValueTask DisposeAsync()
{
foreach (var (_, tcs) in _pendingRequests)
{
tcs.TrySetCanceled();
}
_pendingRequests.Clear();
return ValueTask.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using DotNetCampus.ModelContextProtocol.Protocol;
using DotNetCampus.ModelContextProtocol.Protocol.Messages.JsonRpc;
using DotNetCampus.ModelContextProtocol.Servers;
using DotNetCampus.ModelContextProtocol.Transports;
using DotNetCampus.ModelContextProtocol.Transports.Http;
using TouchSocket.Core;
using TouchSocket.Http;
Expand Down Expand Up @@ -257,11 +258,15 @@ private async ValueTask HandleStreamableHttpMessageAsync(HttpContext context, Ca
}
}

JsonRpcRequest? jsonRpcRequest;
var sessionIdStr = request.Headers.Get(SessionIdHeader).First;

// 将 body 读取并解析为 JsonDocument,通过 JsonElement 检测消息类型。
ReadOnlyMemory<byte> bodyBytes;
JsonDocument bodyDoc;
try
{
var bodyBytes = await request.GetContentAsync();
jsonRpcRequest = await _manager.ReadRequestAsync(bodyBytes);
bodyBytes = await request.GetContentAsync();
bodyDoc = JsonDocument.Parse(bodyBytes);
}
catch (JsonException)
{
Expand All @@ -270,71 +275,125 @@ private async ValueTask HandleStreamableHttpMessageAsync(HttpContext context, Ca
return;
}

if (jsonRpcRequest == null)
using (bodyDoc)
{
Log.Warn($"[McpServer][TouchSocket] POST request rejected: Empty body.");
await context.RespondHttpError(HttpStatusCode.BadRequest, "Empty body");
return;
}
var bodyElement = bodyDoc.RootElement;

var isInitialize = jsonRpcRequest.Method == RequestMethods.Initialize;
var sessionIdStr = request.Headers.Get(SessionIdHeader).First;
TouchSocketHttpServerTransportSession? session;
// 检测是 JSON-RPC 请求(有 method)还是响应(无 method,有 result 或 error)。
var isResponse = !bodyElement.TryGetProperty("method", out _)
&& (bodyElement.TryGetProperty("result", out _) || bodyElement.TryGetProperty("error", out _));

if (isInitialize)
{
// 初始化请求,创建新 Session
var newSessionId = _manager.MakeNewSessionId();
var newSession = new TouchSocketHttpServerTransportSession(_manager, newSessionId.Id);
if (isResponse)
{
// 客户端响应服务器发起的请求(如 sampling/createMessage)。
if (string.IsNullOrEmpty(sessionIdStr) || !_sessions.TryGetValue(sessionIdStr, out var responseSession))
{
Log.Warn($"[McpServer][TouchSocket] Response routing failed: Session not found. SessionId={sessionIdStr}");
await context.RespondHttpError(HttpStatusCode.NotFound, "Session not found");
return;
}

JsonRpcResponse? jsonRpcResponse;
try
{
jsonRpcResponse = await _manager.ReadResponseAsync(bodyBytes);
}
catch (JsonException)
{
await context.RespondHttpError(HttpStatusCode.BadRequest, "Invalid JSON response");
return;
}

if (jsonRpcResponse is not null)
{
responseSession.HandleResponseAsync(jsonRpcResponse);
}

await context.RespondHttpSuccess(HttpStatusCode.Accepted);
return;
}

if (_sessions.TryAdd(newSessionId.Id, newSession))
JsonRpcRequest? jsonRpcRequest;
try
{
session = newSession;
_manager.Add(session);
context.Response.Headers.Add(SessionIdHeader, newSessionId.Id);
Log.Info($"[McpServer][TouchSocket] Session created. SessionId={newSessionId.Id}");
jsonRpcRequest = await _manager.ReadRequestAsync(bodyBytes);
}
else
catch (JsonException)
{
Log.Error($"[McpServer][TouchSocket] Session ID collision. SessionId={newSessionId.Id}");
await context.RespondHttpError(HttpStatusCode.InternalServerError, "Session ID collision");
Log.Warn($"[McpServer][TouchSocket] POST request rejected: Invalid JSON.");
await context.RespondHttpError(HttpStatusCode.BadRequest, "Invalid JSON");
return;
}
}
else
{
if (string.IsNullOrEmpty(sessionIdStr))

if (jsonRpcRequest == null)
{
Log.Warn($"[McpServer][TouchSocket] POST request rejected: Missing Mcp-Session-Id header. Method={jsonRpcRequest.Method}");
await context.RespondHttpError(HttpStatusCode.BadRequest, "Missing Mcp-Session-Id header");
Log.Warn($"[McpServer][TouchSocket] POST request rejected: Empty body.");
await context.RespondHttpError(HttpStatusCode.BadRequest, "Empty body");
return;
}

if (!_sessions.TryGetValue(sessionIdStr, out session))
var isInitialize = jsonRpcRequest.Method == RequestMethods.Initialize;
TouchSocketHttpServerTransportSession? session;

if (isInitialize)
{
Log.Warn($"[McpServer][TouchSocket] POST request rejected: Session not found. SessionId={sessionIdStr}, Method={jsonRpcRequest.Method}");
await context.RespondHttpError(HttpStatusCode.NotFound, "Session not found");
return;
// 初始化请求,创建新 Session
var newSessionId = _manager.MakeNewSessionId();
var newSession = new TouchSocketHttpServerTransportSession(_manager, newSessionId.Id);

if (_sessions.TryAdd(newSessionId.Id, newSession))
{
session = newSession;
_manager.Add(session);
context.Response.Headers.Add(SessionIdHeader, newSessionId.Id);
Log.Info($"[McpServer][TouchSocket] Session created. SessionId={newSessionId.Id}");
}
else
{
Log.Error($"[McpServer][TouchSocket] Session ID collision. SessionId={newSessionId.Id}");
await context.RespondHttpError(HttpStatusCode.InternalServerError, "Session ID collision");
return;
}
}
else
{
if (string.IsNullOrEmpty(sessionIdStr))
{
Log.Warn($"[McpServer][TouchSocket] POST request rejected: Missing Mcp-Session-Id header. Method={jsonRpcRequest.Method}");
await context.RespondHttpError(HttpStatusCode.BadRequest, "Missing Mcp-Session-Id header");
return;
}

if (!_sessions.TryGetValue(sessionIdStr, out session))
{
Log.Warn($"[McpServer][TouchSocket] POST request rejected: Session not found. SessionId={sessionIdStr}, Method={jsonRpcRequest.Method}");
await context.RespondHttpError(HttpStatusCode.NotFound, "Session not found");
return;
}
}
}

Log.Debug($"[McpServer][TouchSocket] Handling JSON-RPC request. SessionId={session.SessionId}, Method={jsonRpcRequest.Method}, MessageId={jsonRpcRequest.Id}");
Log.Debug($"[McpServer][TouchSocket] Handling JSON-RPC request. SessionId={session.SessionId}, Method={jsonRpcRequest.Method}, MessageId={jsonRpcRequest.Id}");

var jsonRpcResponse = await _manager.HandleRequestAsync(jsonRpcRequest,
s => s.AddHttpTransportServices(session.SessionId, request),
cancellationToken: cancellationToken);
var jsonRpcResponse2 = await _manager.HandleRequestAsync(jsonRpcRequest,
s =>
{
s.AddHttpTransportServices(session.SessionId, request);
s.AddTransportSession(session);
},
cancellationToken: cancellationToken);

if (jsonRpcResponse != null)
{
// Request: Success or Failed.
Log.Debug($"[McpServer][TouchSocket] Sending JSON-RPC response. SessionId={session.SessionId}, Method={jsonRpcRequest.Method}, MessageId={jsonRpcRequest.Id}");
await context.RespondJsonRpcAsync(_manager, HttpStatusCode.OK, jsonRpcResponse);
}
else
{
// Notification: No need to respond.
Log.Debug($"[McpServer][TouchSocket] No response for notification. SessionId={session.SessionId}, Method={jsonRpcRequest.Method}, MessageId={jsonRpcRequest.Id}");
await context.RespondHttpSuccess(HttpStatusCode.Accepted);
if (jsonRpcResponse2 != null)
{
// Request: Success or Failed.
Log.Debug($"[McpServer][TouchSocket] Sending JSON-RPC response. SessionId={session.SessionId}, Method={jsonRpcRequest.Method}, MessageId={jsonRpcRequest.Id}");
await context.RespondJsonRpcAsync(_manager, HttpStatusCode.OK, jsonRpcResponse2);
}
else
{
// Notification: No need to respond.
Log.Debug($"[McpServer][TouchSocket] No response for notification. SessionId={session.SessionId}, Method={jsonRpcRequest.Method}, MessageId={jsonRpcRequest.Id}");
await context.RespondHttpSuccess(HttpStatusCode.Accepted);
}
}
}

Expand Down
Loading
Loading