所有文章 > AI驱动 > 大模型上下文协议与Spring开发集成篇——mcp-spring-webmvc原理
大模型上下文协议与Spring开发集成篇——mcp-spring-webmvc原理

大模型上下文协议与Spring开发集成篇——mcp-spring-webmvc原理

mcp-spring-webmvc 模块介绍

1. 项目概述

mcp-spring-webmvc 是 MCP 协议的 Spring Web MVC 实现,主要提供基于 SSE (Server-Sent Events) 的服务器端传输层实现。主要依赖


    
    
        io.modelcontextprotocol.sdk
        mcp
    
    
    
        org.springframework
        spring-webmvc
    
    
    
        jakarta.servlet
        jakarta.servlet-api
    

2. 核心组件WebMvcSseServerTransport

public class WebMvcSseServerTransport implements ServerMcpTransport {
    // SSE 事件类型
    public static final String MESSAGE_EVENT_TYPE = "message";
    public static final String ENDPOINT_EVENT_TYPE = "endpoint";
    // 默认端点
    public static final String DEFAULT_SSE_ENDPOINT = "/sse";
    // 会话管理
    private final ConcurrentHashMap sessions;
}

主要功能:SSE 连接管理消息处理会话生命周期管理错误处理

3. 使用示例

Spring 配置

@Configuration
@EnableWebMvc
class Config {
    @Bean
    public WebMvcSseServerTransport webMvcSseServerTransport() {
        return new WebMvcSseServerTransport(
            new ObjectMapper(),
            "/mcp/message"
        );
    }
    @Bean
    public RouterFunction routerFunction(
        WebMvcSseServerTransport transport) {
        return transport.getRouterFunction();
    }
}

服务器创建

// 创建 MCP 服务器
var server = McpServer.sync(transport)
    .serverInfo("my-server", "1.0.0")
    .capabilities(capabilities)
    .tools(calculatorTool)
    .resources(fileResource)
    .build();

4. 主要特性

  • 同步/异步支持:支持同步操作支持异步回调支持响应式编程
  • 会话管理:唯一会话 ID会话生命周期管理自动清理机制
  • 消息处理:JSON-RPC 消息序列化消息路由错误处理
  • 安全性:线程安全设计资源清理异常处理

5. 测试支持

集成测试

@Test
void testToolCallSuccess() {
    // 1. 创建工具
    var tool = new SyncToolRegistration(
        new Tool("calculator", "计算器", schema),
        request -> new CallToolResult(calculate(request))
    );
    // 2. 创建服务器
    var server = McpServer.sync(transport)
        .capabilities(capabilities)
        .tools(tool)
        .build();
    // 3. 创建客户端
    var client = McpClient.sync(clientTransport).build();
    // 4. 测试调用
    var result = client.callTool(
        new CallToolRequest("calculator", params)
    );
    assertThat(result).isNotNull();
}

测试配置


6. 最佳实践

错误处理

try {
    server.handleMessage(request);
} catch (McpError e) {
    return ServerResponse.badRequest()
        .body(new McpError("Invalid message format"));
}

资源管理

@PreDestroy
public void cleanup() {
    transport.closeGracefully().block();
}

会话管理

sessions.values().forEach(session -> {
    session.close();
    sessions.remove(session.getId());
});

mcp-spring-webmvc 模块提供了一个完整的 Spring Web MVC 集成实现,使得 MCP 服务器可以轻松集成到基于 Spring MVC 的应用中。

实现原理

让我详细分析 WebMvcSseServerTransport 的实现原理:

1. 核心架构

public class WebMvcSseServerTransport implements ServerMcpTransport {
    // 会话管理
    private final ConcurrentHashMap sessions;
    // 消息处理器
    private Function<Mono, Mono> connectHandler;
    // 路由配置
    private final RouterFunction routerFunction;
}

2. 主要组件

2.1 会话管理

private static class ClientSession {
    private final String id;                       // 会话ID
    private final SseBuilder sseBuilder;               // SSE构建器
    void close() {
        logger.debug("Closing session: {}", id);
        try {
            sseBuilder.complete();
        } catch (Exception e) {
            logger.warn("Failed to complete SSE emitter for session {}", id);
        }
    }
}

2.2 路由处理

public WebMvcSseServerTransport(ObjectMapper objectMapper, String messageEndpoint) {
    this.routerFunction = RouterFunctions.route()
        .GET(DEFAULT_SSE_ENDPOINT, this::handleSseConnection)  // SSE连接处理
        .POST(messageEndpoint, this::handleMessage)               // 消息处理
        .build();
}

3. 核心流程

3.1 SSE 连接建立

private ServerResponse handleSseConnection(ServerRequest request) {
    // 1. 生成会话ID
    String sessionId = UUID.randomUUID().toString();
    // 2. 创建SSE响应
    return ServerResponse.sse(sseBuilder -> {
        // 3. 注册生命周期回调
        sseBuilder.onComplete(() -> sessions.remove(sessionId));
        sseBuilder.onTimeout(() -> sessions.remove(sessionId));
        // 4. 创建并保存会话
        ClientSession session = new ClientSession(sessionId, sseBuilder);
        this.sessions.put(sessionId, session);
        // 5. 发送初始端点事件
        session.sseBuilder.id(session.id)
            .event(ENDPOINT_EVENT_TYPE)
            .data(messageEndpoint);
    });
}

3.2 消息处理

private ServerResponse handleMessage(ServerRequest request) {
    try {
        // 1. 解析消息
        String body = request.body(String.class);
        McpSchema.JSONRPCMessage message =
            McpSchema.deserializeJsonRpcMessage(objectMapper, body);
        // 2. 处理消息
        McpSchema.JSONRPCMessage response =
            Mono.just(message)
            .transform(connectHandler)
            .block();
        // 3. 返回响应
        return ServerResponse.ok().build();
    } catch (Exception e) {
        return ServerResponse.badRequest()
            .body(new McpError("Invalid message format"));
    }
}

3.3 消息广播

public Mono sendMessage(McpSchema.JSONRPCMessage message) {
    return Mono.fromRunnable(() -> {
        // 1. 序列化消息
        String jsonText = objectMapper.writeValueAsString(message);
        // 2. 广播到所有会话
        sessions.values().forEach(session -> {
            try {
                session.sseBuilder
                    .id(session.id)
                    .event(MESSAGE_EVENT_TYPE)
                    .data(jsonText);
            } catch (Exception e) {
                logger.error("Failed to send message to session {}", session.id);
                session.sseBuilder.error(e);
            }
        });
    });
}

4. 关键特性

  • 线程安全:使用 ConcurrentHashMap 管理会话原子操作保证同步消息处理

  • 生命周期管理:

    public Mono closeGracefully() {
    return Mono.fromRunnable(() -> {
        this.isClosing = true;
        // 关闭所有会话
        sessions.values().forEach(session -> {
            session.close();
            sessions.remove(session.id);
        });
    });
    }
  • 错误处理:

    try {
    // 处理消息
    session.sseBuilder.data(jsonText);
    } catch (Exception e) {
    // 错误通知
    session.sseBuilder.error(e);
    // 日志记录
    logger.error("Error sending message: {}", e.getMessage());
    }
  • 资源管理:

    // 会话自动清理
    sseBuilder.onComplete(() -> sessions.remove(sessionId));
    sseBuilder.onTimeout(() -> sessions.remove(sessionId));

5. 工作流程

sequenceDiagram
    participant Client
    participant Transport
    participant Session
    participant Handler
    participant Client
    Client->>Transport: GET /sse
    Transport->>Session: 创建会话
    Transport->>Client: SSE连接建立
    Transport->>Client: 发送端点事件
    Client->>Transport: POST /message
    Transport->>Handler: 处理消息
    Handler->>Transport: 返回响应
    Transport->>Client: 发送响应
    Transport->>Sessions: 广播消息
    Sessions->>Client: SSE事件推送

6. 使用示例

@Configuration
@EnableWebMvc
class Config {
    @Bean
    public WebMvcSseServerTransport sseTransport() {
        return new WebMvcSseServerTransport(
            new ObjectMapper(),
            "/mcp/message"
        );
    }
    @Bean
    public McpServer mcpServer(WebMvcSseServerTransport transport) {
        return McpServer.sync(transport)
            .serverInfo("my-server", "1.0.0")
            .capabilities(capabilities)
            .build();
    }
}

WebMvcSseServerTransport 通过 Spring Web MVC 提供了一个完整的 SSE 传输层实现,支持双向通信、会话管理和消息广播等核心功能。
使用方法让我通过具体案例来说明 WebMvcSseServerTransport 的使用:

1. 基础配置案例

Spring MVC 配置

@Configuration
@EnableWebMvc
class McpConfig {
    @Bean
    public WebMvcSseServerTransport mcpTransport() {
        return new WebMvcSseServerTransport(
            new ObjectMapper(),
            "/mcp/message"  // 消息端点
        );
    }
    @Bean
    public RouterFunction mcpRoutes(WebMvcSseServerTransport transport) {
        return transport.getRouterFunction();
    }
}

2. 服务器端使用案例

2.1 基本服务器设置

// 1. 创建服务器能力配置
var capabilities = McpSchema.ServerCapabilities.builder()
    .resources(true)  // 支持资源访问
    .tools(true)      // 支持工具调用
    .prompts(true)    // 支持提示模板
    .logging()        // 支持日志
    .build();
// 2. 创建服务器
var mcpServer = McpServer.sync(mcpTransport)
    .serverInfo("demo-server", "1.0.0")
    .capabilities(capabilities)
    .build();

2.2 添加工具支持

// 1. 定义计算器工具
var calculatorTool = new SyncToolRegistration(
    new Tool("calculator", "基础计算器", schema),
    request -> {
        Map params = request.arguments();
        double result = calculate(params);
        return new CallToolResult(
            List.of(new TextContent(String.valueOf(result))),
            null
        );
    }
);
// 2. 创建带工具的服务器
var mcpServer = McpServer.sync(mcpTransport)
    .capabilities(capabilities)
    .tools(calculatorTool)
    .build();

2.3 添加资源支持

// 1. 定义文件资源处理器
var fileResource = new SyncResourceRegistration(
    new Resource("file://data", "数据文件"),
    request -> {
        String content = readFile(request.uri());
        return new ReadResourceResult(
            new TextContent(content)
        );
    }
);
// 2. 创建带资源的服务器
var mcpServer = McpServer.sync(mcpTransport)
    .capabilities(capabilities)
    .resources(fileResource)
    .build();

3. 完整应用案例

3.1 AI助手服务器

@Configuration
class AiAssistantConfig {
    @Bean
    public WebMvcSseServerTransport mcpTransport() {
        return new WebMvcSseServerTransport(
            new ObjectMapper(),
            "/ai/message"
        );
    }
    @Bean
    public McpServer aiServer(WebMvcSseServerTransport transport) {
        // 1. 定义工具
        var searchTool = new SyncToolRegistration(
            new Tool("search", "搜索知识库", searchSchema),
            this::performSearch
        );
        var writeTool = new SyncToolRegistration(
            new Tool("write", "生成文档", writeSchema),
            this::generateDocument
        );
        // 2. 定义资源
        var knowledgeBase = new SyncResourceRegistration(
            new Resource("kb://docs", "知识库文档"),
            this::readKnowledgeBase
        );
        // 3. 创建服务器
        return McpServer.sync(transport)
            .serverInfo("ai-assistant", "1.0.0")
            .capabilities(ServerCapabilities.builder()
                .tools(true)
                .resources(true)
                .prompts(true)
                .build())
            .tools(searchTool, writeTool)
            .resources(knowledgeBase)
            .build();
    }
}

3.2 变更通知案例

@Service
class ResourceManager {
    private final McpServer mcpServer;
    public void updateResource(String uri, String content) {
        // 1. 更新资源
        saveResource(uri, content);
        // 2. 通知客户端资源变更
        mcpServer.notifyResourcesListChanged();
    }
    public void addNewTool(Tool tool) {
        // 1. 添加新工具
        mcpServer.addTool(new SyncToolRegistration(
            tool,
            this::handleToolCall
        ));
        // 2. 通知客户端工具变更
        mcpServer.notifyToolsListChanged();
    }
}

4. 错误处理案例

@Component
class McpErrorHandler {
    private final McpServer mcpServer;
    public void handleError(Exception e) {
        // 1. 记录错误
        logger.error("MCP error: ", e);
        // 2. 发送日志通知
        mcpServer.loggingNotification(
            new LoggingMessageNotification(
                LoggingLevel.ERROR,
                "Operation failed: " + e.getMessage()
            )
        );
    }
}

5. 生命周期管理案例

@Component
class McpServerLifecycle {
    private final McpServer mcpServer;
    private final WebMvcSseServerTransport transport;
    @PostConstruct
    public void init() {
        // 初始化服务器
        mcpServer.initialize();
    }
    @PreDestroy
    public void shutdown() {
        try {
            // 1. 优雅关闭服务器
            mcpServer.closeGracefully();
            // 2. 关闭传输层
            transport.closeGracefully()
                .block(Duration.ofSeconds(5));
        } catch (Exception e) {
            logger.error("Shutdown error", e);
            // 强制关闭
            mcpServer.close();
        }
    }
}

这些案例展示了 WebMvcSseServerTransport 在不同场景下的使用方式,包括基础配置、工具和资源管理、错误处理以及生命周期管理等方面。通过这些案例,可以更好地理解如何在实际应用中使用 WebMvcSseServerTransport。

原文转载自:https://mp.weixin.qq.com/s/XtQF7qQpgIq7hsCtOzQoTg

#你可能也喜欢这些API文章!