前面我们学习了 MCP 的三大原语,这篇文章将深入传输层,了解 MCP 消息是如何在 Client 和 Server 之间传递的,以及如何将 MCP Server 部署到生产环境。
🎯 Transport 概述#
MCP 协议与传输层解耦,支持多种传输方式:
| Transport | 适用场景 | 特点 |
|---|---|---|
| stdio | 本地工具、CLI | 简单可靠,每连接一进程 |
| Streamable HTTP | 远程服务、Web | 支持多客户端,可扩展 |
| 自定义 | 特殊需求 | 需实现协议约定 |
stdio Transport#
这是最简单的传输方式,Client 启动 Server 作为子进程,通过标准输入输出通信。
工作原理#
┌─────────┐ stdin (JSON-RPC) ┌─────────┐│ Client │ ──────────────────► │ Server ││ │ ◄────────────────── │ │└─────────┘ stdout (JSON-RPC) └─────────┘ │ ▼ stderr (logs)Server 实现#
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'import { z } from 'zod'
const server = new McpServer({ name: 'stdio-example', version: '1.0.0',})
server.registerTool( 'greet', { title: '问候', description: '向用户问候', inputSchema: { name: z.string() }, }, async ({ name }) => ({ content: [{ type: 'text', text: `你好,${name}!` }], }))
// stdio 传输const transport = new StdioServerTransport()await server.connect(transport)
// 日志使用 stderr(stdout 被协议占用)console.error('Server started on stdio')消息格式#
每条消息是一个独立的 JSON 行,换行符分隔:
{"jsonrpc":"2.0","id":1,"method":"initialize","params":{...}}{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{...}}🔶 注意:消息内容不能包含换行符。
配置示例(Claude Desktop)#
{ "mcpServers": { "my-server": { "command": "node", "args": ["/path/to/server.js"], "env": { "API_KEY": "your-api-key" } } }}Streamable HTTP Transport#
适用于远程部署、多客户端连接的场景。
工作原理#
┌─────────┐ POST /mcp ┌─────────┐│ Client │ ─────────────► │ Server ││ │ ◄───────────── │ (HTTP) │└─────────┘ SSE / JSON └─────────┘- 客户端通过 HTTP POST 发送请求
- 服务端返回 JSON 或 SSE 流
基础 HTTP Server#
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'import express from 'express'import { z } from 'zod'
const server = new McpServer({ name: 'http-example', version: '1.0.0',})
server.registerTool( 'add', { title: '加法', description: '计算两数之和', inputSchema: { a: z.number(), b: z.number() }, outputSchema: { result: z.number() }, }, async ({ a, b }) => { const result = a + b return { content: [{ type: 'text', text: `${a} + ${b} = ${result}` }], structuredContent: { result }, } })
// Express 服务器const app = express()app.use(express.json())
app.post('/mcp', async (req, res) => { const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: undefined, enableJsonResponse: true, })
res.on('close', () => transport.close())
await server.connect(transport) await transport.handleRequest(req, res, req.body)})
const PORT = process.env.PORT || 3000app.listen(PORT, () => { console.log(`MCP Server running on http://localhost:${PORT}/mcp`)})支持 SSE 流式响应#
app.post('/mcp', async (req, res) => { const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: undefined, enableJsonResponse: false, // 使用 SSE })
res.on('close', () => transport.close())
await server.connect(transport) await transport.handleRequest(req, res, req.body)})会话管理#
对于需要状态的场景,可以启用会话:
import { randomUUID } from 'crypto'
const sessions = new Map<string, McpServer>()
app.post('/mcp', async (req, res) => { const sessionId = (req.headers['x-session-id'] as string) || randomUUID()
// 复用或创建 Server 实例 let mcpServer = sessions.get(sessionId) if (!mcpServer) { mcpServer = createNewServer() sessions.set(sessionId, mcpServer) }
const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => sessionId, enableJsonResponse: true, })
res.setHeader('x-session-id', sessionId) res.on('close', () => transport.close())
await mcpServer.connect(transport) await transport.handleRequest(req, res, req.body)})
// 定期清理过期会话setInterval(() => { // 实现会话清理逻辑}, 60000)安全性考虑#
1. Origin 验证#
防止 DNS 重绑定攻击:
const ALLOWED_ORIGINS = ['https://example.com', 'http://localhost:3000']
app.use('/mcp', (req, res, next) => { const origin = req.headers.origin
if (origin && !ALLOWED_ORIGINS.includes(origin)) { res.status(403).json({ error: 'Origin not allowed' }) return }
// 设置 CORS 头 if (origin) { res.setHeader('Access-Control-Allow-Origin', origin) } res.setHeader('Access-Control-Allow-Methods', 'POST, OPTIONS') res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization')
if (req.method === 'OPTIONS') { res.status(204).end() return }
next()})2. 认证#
import jwt from 'jsonwebtoken'
const JWT_SECRET = process.env.JWT_SECRET!
function authMiddleware( req: express.Request, res: express.Response, next: express.NextFunction) { const authHeader = req.headers.authorization
if (!authHeader?.startsWith('Bearer ')) { res.status(401).json({ error: 'Missing authorization token' }) return }
const token = authHeader.slice(7)
try { const decoded = jwt.verify(token, JWT_SECRET) req.user = decoded next() } catch { res.status(401).json({ error: 'Invalid token' }) }}
app.post('/mcp', authMiddleware, async (req, res) => { // 已认证的请求处理})3. 本地绑定#
本地服务应只监听 localhost:
// ✅ 安全:只监听本地app.listen(3000, '127.0.0.1', () => { console.log('Server listening on 127.0.0.1:3000')})
// ❌ 危险:监听所有接口app.listen(3000, '0.0.0.0') // 不要这样做(除非有必要)生产环境部署#
Docker 部署#
# DockerfileFROM node:20-alpine
WORKDIR /app
COPY package*.json ./RUN npm ci --only=production
COPY dist ./dist
ENV NODE_ENV=productionENV PORT=3000
EXPOSE 3000
USER node
CMD ["node", "dist/server.js"]version: '3.8'
services: mcp-server: build: . ports: - '3000:3000' environment: - NODE_ENV=production - API_KEY=${API_KEY} healthcheck: test: ['CMD', 'curl', '-f', 'http://localhost:3000/health'] interval: 30s timeout: 10s retries: 3 restart: unless-stopped健康检查端点#
app.get('/health', (req, res) => { res.json({ status: 'healthy', version: '1.0.0', uptime: process.uptime(), })})Nginx 反向代理#
upstream mcp_backend { server 127.0.0.1:3000; keepalive 32;}
server { listen 443 ssl http2; server_name mcp.example.com;
ssl_certificate /etc/ssl/certs/mcp.crt; ssl_certificate_key /etc/ssl/private/mcp.key;
location /mcp { proxy_pass http://mcp_backend; proxy_http_version 1.1;
# SSE 支持 proxy_set_header Connection ''; proxy_buffering off; proxy_cache off;
# 超时设置 proxy_read_timeout 86400s; proxy_send_timeout 86400s;
# 安全头 proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; }}PM2 进程管理#
module.exports = { apps: [ { name: 'mcp-server', script: './dist/server.js', instances: 'max', exec_mode: 'cluster', env_production: { NODE_ENV: 'production', PORT: 3000, }, max_memory_restart: '500M', error_file: './logs/error.log', out_file: './logs/out.log', merge_logs: true, }, ],}# 启动pm2 start ecosystem.config.js --env production
# 查看状态pm2 status
# 查看日志pm2 logs mcp-server完整 HTTP Server 示例#
import { McpServer, ResourceTemplate,} from '@modelcontextprotocol/sdk/server/mcp.js'import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'import express from 'express'import cors from 'cors'import helmet from 'helmet'import { z } from 'zod'
// 创建 MCP Serverfunction createMcpServer() { const server = new McpServer({ name: 'production-server', version: '1.0.0', })
// 注册 Tools server.registerTool( 'echo', { title: 'Echo', description: '返回输入的消息', inputSchema: { message: z.string() }, }, async ({ message }) => ({ content: [{ type: 'text', text: `Echo: ${message}` }], }) )
// 注册 Resources server.registerResource( 'status', 'status://server', { title: '服务状态', description: '服务器运行状态' }, async (uri) => ({ contents: [ { uri: uri.href, mimeType: 'application/json', text: JSON.stringify( { status: 'running', uptime: process.uptime(), memory: process.memoryUsage(), }, null, 2 ), }, ], }) )
return server}
// Express 应用const app = express()
// 安全中间件app.use(helmet())app.use( cors({ origin: process.env.ALLOWED_ORIGINS?.split(',') || '*', methods: ['POST', 'OPTIONS'], allowedHeaders: ['Content-Type', 'Authorization'], }))app.use(express.json({ limit: '10mb' }))
// 健康检查app.get('/health', (req, res) => { res.json({ status: 'healthy' })})
// MCP 端点app.post('/mcp', async (req, res) => { const mcpServer = createMcpServer()
const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: undefined, enableJsonResponse: true, })
res.on('close', () => { transport.close() })
try { await mcpServer.connect(transport) await transport.handleRequest(req, res, req.body) } catch (error) { console.error('MCP request error:', error) if (!res.headersSent) { res.status(500).json({ error: 'Internal server error' }) } }})
// 启动服务器const PORT = parseInt(process.env.PORT || '3000')const HOST = process.env.HOST || '127.0.0.1'
app.listen(PORT, HOST, () => { console.log(`MCP Server running on http://${HOST}:${PORT}`) console.log(`MCP endpoint: http://${HOST}:${PORT}/mcp`) console.log(`Health check: http://${HOST}:${PORT}/health`)})package.json 配置#
{ "name": "mcp-http-server", "version": "1.0.0", "type": "module", "scripts": { "build": "tsc", "start": "node dist/server.js", "dev": "tsx watch src/server.ts" }, "dependencies": { "@modelcontextprotocol/sdk": "^1.0.0", "cors": "^2.8.5", "express": "^4.18.2", "helmet": "^7.1.0", "zod": "^3.22.0" }, "devDependencies": { "@types/cors": "^2.8.17", "@types/express": "^4.17.21", "@types/node": "^20.10.0", "tsx": "^4.6.0", "typescript": "^5.3.0" }}小结#
这篇文章我们深入学习了 MCP Transport:
✅ stdio Transport 工作原理与实现 ✅ Streamable HTTP Transport 配置 ✅ SSE 流式响应 ✅ 安全性考虑:Origin 验证、认证、本地绑定 ✅ 生产部署:Docker、Nginx、PM2
下一篇我们将学习如何构建 MCP Client,将 MCP Server 集成到你自己的应用中。