Skip to content

MCP Transport 与部署

前面我们学习了 MCP 的三大原语,这篇文章将深入传输层,了解 MCP 消息是如何在 Client 和 Server 之间传递的,以及如何将 MCP Server 部署到生产环境。

🎯 Transport 概述#

MCP 协议与传输层解耦,支持多种传输方式:

Transport适用场景特点
stdio本地工具、CLI简单可靠,每连接一进程
Streamable HTTP远程服务、Web支持多客户端,可扩展
自定义特殊需求需实现协议约定

stdio Transport#

这是最简单的传输方式,Client 启动 Server 作为子进程,通过标准输入输出通信。

工作原理#

┌─────────┐ stdin (JSON-RPC) ┌─────────┐
│ Client │ ──────────────────► │ Server │
│ │ ◄────────────────── │ │
└─────────┘ stdout (JSON-RPC) └─────────┘
▼ stderr (logs)

Server 实现#

server.ts
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'
import { z } from 'zod'
const server = new McpServer({
name: 'stdio-example',
version: '1.0.0',
})
server.registerTool(
'greet',
{
title: '问候',
description: '向用户问候',
inputSchema: { name: z.string() },
},
async ({ name }) => ({
content: [{ type: 'text', text: `你好,${name}` }],
})
)
// stdio 传输
const transport = new StdioServerTransport()
await server.connect(transport)
// 日志使用 stderr(stdout 被协议占用)
console.error('Server started on stdio')

消息格式#

每条消息是一个独立的 JSON 行,换行符分隔:

{"jsonrpc":"2.0","id":1,"method":"initialize","params":{...}}
{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{...}}

🔶 注意:消息内容不能包含换行符。

配置示例(Claude Desktop)#

{
"mcpServers": {
"my-server": {
"command": "node",
"args": ["/path/to/server.js"],
"env": {
"API_KEY": "your-api-key"
}
}
}
}

Streamable HTTP Transport#

适用于远程部署、多客户端连接的场景。

工作原理#

┌─────────┐ POST /mcp ┌─────────┐
│ Client │ ─────────────► │ Server │
│ │ ◄───────────── │ (HTTP) │
└─────────┘ SSE / JSON └─────────┘

基础 HTTP Server#

http-server.ts
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'
import express from 'express'
import { z } from 'zod'
const server = new McpServer({
name: 'http-example',
version: '1.0.0',
})
server.registerTool(
'add',
{
title: '加法',
description: '计算两数之和',
inputSchema: { a: z.number(), b: z.number() },
outputSchema: { result: z.number() },
},
async ({ a, b }) => {
const result = a + b
return {
content: [{ type: 'text', text: `${a} + ${b} = ${result}` }],
structuredContent: { result },
}
}
)
// Express 服务器
const app = express()
app.use(express.json())
app.post('/mcp', async (req, res) => {
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined,
enableJsonResponse: true,
})
res.on('close', () => transport.close())
await server.connect(transport)
await transport.handleRequest(req, res, req.body)
})
const PORT = process.env.PORT || 3000
app.listen(PORT, () => {
console.log(`MCP Server running on http://localhost:${PORT}/mcp`)
})

支持 SSE 流式响应#

app.post('/mcp', async (req, res) => {
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined,
enableJsonResponse: false, // 使用 SSE
})
res.on('close', () => transport.close())
await server.connect(transport)
await transport.handleRequest(req, res, req.body)
})

会话管理#

对于需要状态的场景,可以启用会话:

import { randomUUID } from 'crypto'
const sessions = new Map<string, McpServer>()
app.post('/mcp', async (req, res) => {
const sessionId = (req.headers['x-session-id'] as string) || randomUUID()
// 复用或创建 Server 实例
let mcpServer = sessions.get(sessionId)
if (!mcpServer) {
mcpServer = createNewServer()
sessions.set(sessionId, mcpServer)
}
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => sessionId,
enableJsonResponse: true,
})
res.setHeader('x-session-id', sessionId)
res.on('close', () => transport.close())
await mcpServer.connect(transport)
await transport.handleRequest(req, res, req.body)
})
// 定期清理过期会话
setInterval(() => {
// 实现会话清理逻辑
}, 60000)

安全性考虑#

1. Origin 验证#

防止 DNS 重绑定攻击:

const ALLOWED_ORIGINS = ['https://example.com', 'http://localhost:3000']
app.use('/mcp', (req, res, next) => {
const origin = req.headers.origin
if (origin && !ALLOWED_ORIGINS.includes(origin)) {
res.status(403).json({ error: 'Origin not allowed' })
return
}
// 设置 CORS 头
if (origin) {
res.setHeader('Access-Control-Allow-Origin', origin)
}
res.setHeader('Access-Control-Allow-Methods', 'POST, OPTIONS')
res.setHeader('Access-Control-Allow-Headers', 'Content-Type, Authorization')
if (req.method === 'OPTIONS') {
res.status(204).end()
return
}
next()
})

2. 认证#

import jwt from 'jsonwebtoken'
const JWT_SECRET = process.env.JWT_SECRET!
function authMiddleware(
req: express.Request,
res: express.Response,
next: express.NextFunction
) {
const authHeader = req.headers.authorization
if (!authHeader?.startsWith('Bearer ')) {
res.status(401).json({ error: 'Missing authorization token' })
return
}
const token = authHeader.slice(7)
try {
const decoded = jwt.verify(token, JWT_SECRET)
req.user = decoded
next()
} catch {
res.status(401).json({ error: 'Invalid token' })
}
}
app.post('/mcp', authMiddleware, async (req, res) => {
// 已认证的请求处理
})

3. 本地绑定#

本地服务应只监听 localhost:

// ✅ 安全:只监听本地
app.listen(3000, '127.0.0.1', () => {
console.log('Server listening on 127.0.0.1:3000')
})
// ❌ 危险:监听所有接口
app.listen(3000, '0.0.0.0') // 不要这样做(除非有必要)

生产环境部署#

Docker 部署#

# Dockerfile
FROM node:20-alpine
WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production
COPY dist ./dist
ENV NODE_ENV=production
ENV PORT=3000
EXPOSE 3000
USER node
CMD ["node", "dist/server.js"]
docker-compose.yml
version: '3.8'
services:
mcp-server:
build: .
ports:
- '3000:3000'
environment:
- NODE_ENV=production
- API_KEY=${API_KEY}
healthcheck:
test: ['CMD', 'curl', '-f', 'http://localhost:3000/health']
interval: 30s
timeout: 10s
retries: 3
restart: unless-stopped

健康检查端点#

app.get('/health', (req, res) => {
res.json({
status: 'healthy',
version: '1.0.0',
uptime: process.uptime(),
})
})

Nginx 反向代理#

/etc/nginx/conf.d/mcp.conf
upstream mcp_backend {
server 127.0.0.1:3000;
keepalive 32;
}
server {
listen 443 ssl http2;
server_name mcp.example.com;
ssl_certificate /etc/ssl/certs/mcp.crt;
ssl_certificate_key /etc/ssl/private/mcp.key;
location /mcp {
proxy_pass http://mcp_backend;
proxy_http_version 1.1;
# SSE 支持
proxy_set_header Connection '';
proxy_buffering off;
proxy_cache off;
# 超时设置
proxy_read_timeout 86400s;
proxy_send_timeout 86400s;
# 安全头
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}

PM2 进程管理#

ecosystem.config.js
module.exports = {
apps: [
{
name: 'mcp-server',
script: './dist/server.js',
instances: 'max',
exec_mode: 'cluster',
env_production: {
NODE_ENV: 'production',
PORT: 3000,
},
max_memory_restart: '500M',
error_file: './logs/error.log',
out_file: './logs/out.log',
merge_logs: true,
},
],
}
Terminal window
# 启动
pm2 start ecosystem.config.js --env production
# 查看状态
pm2 status
# 查看日志
pm2 logs mcp-server

完整 HTTP Server 示例#

src/server.ts
import {
McpServer,
ResourceTemplate,
} from '@modelcontextprotocol/sdk/server/mcp.js'
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'
import express from 'express'
import cors from 'cors'
import helmet from 'helmet'
import { z } from 'zod'
// 创建 MCP Server
function createMcpServer() {
const server = new McpServer({
name: 'production-server',
version: '1.0.0',
})
// 注册 Tools
server.registerTool(
'echo',
{
title: 'Echo',
description: '返回输入的消息',
inputSchema: { message: z.string() },
},
async ({ message }) => ({
content: [{ type: 'text', text: `Echo: ${message}` }],
})
)
// 注册 Resources
server.registerResource(
'status',
'status://server',
{ title: '服务状态', description: '服务器运行状态' },
async (uri) => ({
contents: [
{
uri: uri.href,
mimeType: 'application/json',
text: JSON.stringify(
{
status: 'running',
uptime: process.uptime(),
memory: process.memoryUsage(),
},
null,
2
),
},
],
})
)
return server
}
// Express 应用
const app = express()
// 安全中间件
app.use(helmet())
app.use(
cors({
origin: process.env.ALLOWED_ORIGINS?.split(',') || '*',
methods: ['POST', 'OPTIONS'],
allowedHeaders: ['Content-Type', 'Authorization'],
})
)
app.use(express.json({ limit: '10mb' }))
// 健康检查
app.get('/health', (req, res) => {
res.json({ status: 'healthy' })
})
// MCP 端点
app.post('/mcp', async (req, res) => {
const mcpServer = createMcpServer()
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: undefined,
enableJsonResponse: true,
})
res.on('close', () => {
transport.close()
})
try {
await mcpServer.connect(transport)
await transport.handleRequest(req, res, req.body)
} catch (error) {
console.error('MCP request error:', error)
if (!res.headersSent) {
res.status(500).json({ error: 'Internal server error' })
}
}
})
// 启动服务器
const PORT = parseInt(process.env.PORT || '3000')
const HOST = process.env.HOST || '127.0.0.1'
app.listen(PORT, HOST, () => {
console.log(`MCP Server running on http://${HOST}:${PORT}`)
console.log(`MCP endpoint: http://${HOST}:${PORT}/mcp`)
console.log(`Health check: http://${HOST}:${PORT}/health`)
})

package.json 配置#

{
"name": "mcp-http-server",
"version": "1.0.0",
"type": "module",
"scripts": {
"build": "tsc",
"start": "node dist/server.js",
"dev": "tsx watch src/server.ts"
},
"dependencies": {
"@modelcontextprotocol/sdk": "^1.0.0",
"cors": "^2.8.5",
"express": "^4.18.2",
"helmet": "^7.1.0",
"zod": "^3.22.0"
},
"devDependencies": {
"@types/cors": "^2.8.17",
"@types/express": "^4.17.21",
"@types/node": "^20.10.0",
"tsx": "^4.6.0",
"typescript": "^5.3.0"
}
}

小结#

这篇文章我们深入学习了 MCP Transport:

✅ stdio Transport 工作原理与实现 ✅ Streamable HTTP Transport 配置 ✅ SSE 流式响应 ✅ 安全性考虑:Origin 验证、认证、本地绑定 ✅ 生产部署:Docker、Nginx、PM2

下一篇我们将学习如何构建 MCP Client,将 MCP Server 集成到你自己的应用中。

参考资料#