
墨西哥支付方式:是什么?
mcp-spring-webmvc 是 MCP 协议的 Spring Web MVC 实现,主要提供基于 SSE (Server-Sent Events) 的服务器端传输层实现。主要依赖
io.modelcontextprotocol.sdk
mcp
org.springframework
spring-webmvc
jakarta.servlet
jakarta.servlet-api
public class WebMvcSseServerTransport implements ServerMcpTransport {
// SSE 事件类型
public static final String MESSAGE_EVENT_TYPE = "message";
public static final String ENDPOINT_EVENT_TYPE = "endpoint";
// 默认端点
public static final String DEFAULT_SSE_ENDPOINT = "/sse";
// 会话管理
private final ConcurrentHashMap sessions;
}
主要功能:SSE 连接管理消息处理会话生命周期管理错误处理
@Configuration
@EnableWebMvc
class Config {
@Bean
public WebMvcSseServerTransport webMvcSseServerTransport() {
return new WebMvcSseServerTransport(
new ObjectMapper(),
"/mcp/message"
);
}
@Bean
public RouterFunction routerFunction(
WebMvcSseServerTransport transport) {
return transport.getRouterFunction();
}
}
服务器创建
// 创建 MCP 服务器
var server = McpServer.sync(transport)
.serverInfo("my-server", "1.0.0")
.capabilities(capabilities)
.tools(calculatorTool)
.resources(fileResource)
.build();
@Test
void testToolCallSuccess() {
// 1. 创建工具
var tool = new SyncToolRegistration(
new Tool("calculator", "计算器", schema),
request -> new CallToolResult(calculate(request))
);
// 2. 创建服务器
var server = McpServer.sync(transport)
.capabilities(capabilities)
.tools(tool)
.build();
// 3. 创建客户端
var client = McpClient.sync(clientTransport).build();
// 4. 测试调用
var result = client.callTool(
new CallToolRequest("calculator", params)
);
assertThat(result).isNotNull();
}
测试配置
try {
server.handleMessage(request);
} catch (McpError e) {
return ServerResponse.badRequest()
.body(new McpError("Invalid message format"));
}
@PreDestroy
public void cleanup() {
transport.closeGracefully().block();
}
sessions.values().forEach(session -> {
session.close();
sessions.remove(session.getId());
});
mcp-spring-webmvc 模块提供了一个完整的 Spring Web MVC 集成实现,使得 MCP 服务器可以轻松集成到基于 Spring MVC 的应用中。
让我详细分析 WebMvcSseServerTransport 的实现原理:
public class WebMvcSseServerTransport implements ServerMcpTransport {
// 会话管理
private final ConcurrentHashMap sessions;
// 消息处理器
private Function<Mono, Mono> connectHandler;
// 路由配置
private final RouterFunction routerFunction;
}
private static class ClientSession {
private final String id; // 会话ID
private final SseBuilder sseBuilder; // SSE构建器
void close() {
logger.debug("Closing session: {}", id);
try {
sseBuilder.complete();
} catch (Exception e) {
logger.warn("Failed to complete SSE emitter for session {}", id);
}
}
}
public WebMvcSseServerTransport(ObjectMapper objectMapper, String messageEndpoint) {
this.routerFunction = RouterFunctions.route()
.GET(DEFAULT_SSE_ENDPOINT, this::handleSseConnection) // SSE连接处理
.POST(messageEndpoint, this::handleMessage) // 消息处理
.build();
}
private ServerResponse handleSseConnection(ServerRequest request) {
// 1. 生成会话ID
String sessionId = UUID.randomUUID().toString();
// 2. 创建SSE响应
return ServerResponse.sse(sseBuilder -> {
// 3. 注册生命周期回调
sseBuilder.onComplete(() -> sessions.remove(sessionId));
sseBuilder.onTimeout(() -> sessions.remove(sessionId));
// 4. 创建并保存会话
ClientSession session = new ClientSession(sessionId, sseBuilder);
this.sessions.put(sessionId, session);
// 5. 发送初始端点事件
session.sseBuilder.id(session.id)
.event(ENDPOINT_EVENT_TYPE)
.data(messageEndpoint);
});
}
private ServerResponse handleMessage(ServerRequest request) {
try {
// 1. 解析消息
String body = request.body(String.class);
McpSchema.JSONRPCMessage message =
McpSchema.deserializeJsonRpcMessage(objectMapper, body);
// 2. 处理消息
McpSchema.JSONRPCMessage response =
Mono.just(message)
.transform(connectHandler)
.block();
// 3. 返回响应
return ServerResponse.ok().build();
} catch (Exception e) {
return ServerResponse.badRequest()
.body(new McpError("Invalid message format"));
}
}
public Mono sendMessage(McpSchema.JSONRPCMessage message) {
return Mono.fromRunnable(() -> {
// 1. 序列化消息
String jsonText = objectMapper.writeValueAsString(message);
// 2. 广播到所有会话
sessions.values().forEach(session -> {
try {
session.sseBuilder
.id(session.id)
.event(MESSAGE_EVENT_TYPE)
.data(jsonText);
} catch (Exception e) {
logger.error("Failed to send message to session {}", session.id);
session.sseBuilder.error(e);
}
});
});
}
线程安全:使用 ConcurrentHashMap 管理会话原子操作保证同步消息处理
生命周期管理:
public Mono closeGracefully() {
return Mono.fromRunnable(() -> {
this.isClosing = true;
// 关闭所有会话
sessions.values().forEach(session -> {
session.close();
sessions.remove(session.id);
});
});
}
错误处理:
try {
// 处理消息
session.sseBuilder.data(jsonText);
} catch (Exception e) {
// 错误通知
session.sseBuilder.error(e);
// 日志记录
logger.error("Error sending message: {}", e.getMessage());
}
资源管理:
// 会话自动清理
sseBuilder.onComplete(() -> sessions.remove(sessionId));
sseBuilder.onTimeout(() -> sessions.remove(sessionId));
sequenceDiagram
participant Client
participant Transport
participant Session
participant Handler
participant Client
Client->>Transport: GET /sse
Transport->>Session: 创建会话
Transport->>Client: SSE连接建立
Transport->>Client: 发送端点事件
Client->>Transport: POST /message
Transport->>Handler: 处理消息
Handler->>Transport: 返回响应
Transport->>Client: 发送响应
Transport->>Sessions: 广播消息
Sessions->>Client: SSE事件推送
@Configuration
@EnableWebMvc
class Config {
@Bean
public WebMvcSseServerTransport sseTransport() {
return new WebMvcSseServerTransport(
new ObjectMapper(),
"/mcp/message"
);
}
@Bean
public McpServer mcpServer(WebMvcSseServerTransport transport) {
return McpServer.sync(transport)
.serverInfo("my-server", "1.0.0")
.capabilities(capabilities)
.build();
}
}
WebMvcSseServerTransport 通过 Spring Web MVC 提供了一个完整的 SSE 传输层实现,支持双向通信、会话管理和消息广播等核心功能。
使用方法让我通过具体案例来说明 WebMvcSseServerTransport 的使用:
@Configuration
@EnableWebMvc
class McpConfig {
@Bean
public WebMvcSseServerTransport mcpTransport() {
return new WebMvcSseServerTransport(
new ObjectMapper(),
"/mcp/message" // 消息端点
);
}
@Bean
public RouterFunction mcpRoutes(WebMvcSseServerTransport transport) {
return transport.getRouterFunction();
}
}
// 1. 创建服务器能力配置
var capabilities = McpSchema.ServerCapabilities.builder()
.resources(true) // 支持资源访问
.tools(true) // 支持工具调用
.prompts(true) // 支持提示模板
.logging() // 支持日志
.build();
// 2. 创建服务器
var mcpServer = McpServer.sync(mcpTransport)
.serverInfo("demo-server", "1.0.0")
.capabilities(capabilities)
.build();
// 1. 定义计算器工具
var calculatorTool = new SyncToolRegistration(
new Tool("calculator", "基础计算器", schema),
request -> {
Map params = request.arguments();
double result = calculate(params);
return new CallToolResult(
List.of(new TextContent(String.valueOf(result))),
null
);
}
);
// 2. 创建带工具的服务器
var mcpServer = McpServer.sync(mcpTransport)
.capabilities(capabilities)
.tools(calculatorTool)
.build();
// 1. 定义文件资源处理器
var fileResource = new SyncResourceRegistration(
new Resource("file://data", "数据文件"),
request -> {
String content = readFile(request.uri());
return new ReadResourceResult(
new TextContent(content)
);
}
);
// 2. 创建带资源的服务器
var mcpServer = McpServer.sync(mcpTransport)
.capabilities(capabilities)
.resources(fileResource)
.build();
@Configuration
class AiAssistantConfig {
@Bean
public WebMvcSseServerTransport mcpTransport() {
return new WebMvcSseServerTransport(
new ObjectMapper(),
"/ai/message"
);
}
@Bean
public McpServer aiServer(WebMvcSseServerTransport transport) {
// 1. 定义工具
var searchTool = new SyncToolRegistration(
new Tool("search", "搜索知识库", searchSchema),
this::performSearch
);
var writeTool = new SyncToolRegistration(
new Tool("write", "生成文档", writeSchema),
this::generateDocument
);
// 2. 定义资源
var knowledgeBase = new SyncResourceRegistration(
new Resource("kb://docs", "知识库文档"),
this::readKnowledgeBase
);
// 3. 创建服务器
return McpServer.sync(transport)
.serverInfo("ai-assistant", "1.0.0")
.capabilities(ServerCapabilities.builder()
.tools(true)
.resources(true)
.prompts(true)
.build())
.tools(searchTool, writeTool)
.resources(knowledgeBase)
.build();
}
}
@Service
class ResourceManager {
private final McpServer mcpServer;
public void updateResource(String uri, String content) {
// 1. 更新资源
saveResource(uri, content);
// 2. 通知客户端资源变更
mcpServer.notifyResourcesListChanged();
}
public void addNewTool(Tool tool) {
// 1. 添加新工具
mcpServer.addTool(new SyncToolRegistration(
tool,
this::handleToolCall
));
// 2. 通知客户端工具变更
mcpServer.notifyToolsListChanged();
}
}
@Component
class McpErrorHandler {
private final McpServer mcpServer;
public void handleError(Exception e) {
// 1. 记录错误
logger.error("MCP error: ", e);
// 2. 发送日志通知
mcpServer.loggingNotification(
new LoggingMessageNotification(
LoggingLevel.ERROR,
"Operation failed: " + e.getMessage()
)
);
}
}
@Component
class McpServerLifecycle {
private final McpServer mcpServer;
private final WebMvcSseServerTransport transport;
@PostConstruct
public void init() {
// 初始化服务器
mcpServer.initialize();
}
@PreDestroy
public void shutdown() {
try {
// 1. 优雅关闭服务器
mcpServer.closeGracefully();
// 2. 关闭传输层
transport.closeGracefully()
.block(Duration.ofSeconds(5));
} catch (Exception e) {
logger.error("Shutdown error", e);
// 强制关闭
mcpServer.close();
}
}
}
这些案例展示了 WebMvcSseServerTransport 在不同场景下的使用方式,包括基础配置、工具和资源管理、错误处理以及生命周期管理等方面。通过这些案例,可以更好地理解如何在实际应用中使用 WebMvcSseServerTransport。