Skip to content

MCP Tools 深入:让 AI 具备行动能力

Tool 是 MCP 最核心的能力——它让 AI 从”只能说”变成”能做事”。这篇文章将深入探讨 Tool 的设计与实现。

🎯 Tool 的本质#

Tool 就是一个带有 schema 描述的异步函数:

// 伪代码表示 Tool 的结构
interface Tool {
name: string // 唯一标识
description: string // 告诉 AI 这个 Tool 做什么
inputSchema: Schema // 参数定义
outputSchema?: Schema // 返回值定义
handler: (args) => Promise<Result>
}

AI 通过阅读 descriptioninputSchema 来决定何时调用、如何传参。

Schema 设计#

好的 schema 设计直接影响 AI 对 Tool 的理解和使用准确率。

基础类型#

import { z } from 'zod'
// 字符串
z.string()
z.string().min(1).max(100) // 长度限制
z.string().email() // 邮箱格式
z.string().url() // URL 格式
// 数字
z.number()
z.number().int() // 整数
z.number().min(0).max(100) // 范围限制
z.number().positive() // 正数
// 布尔
z.boolean()
// 枚举
z.enum(['low', 'medium', 'high'])
// 可选
z.string().optional()
z.string().default('default value')

复杂类型#

// 数组
z.array(z.string()) // string[]
z.array(z.number()).min(1).max(10) // 限制数组长度
// 对象
z.object({
name: z.string(),
age: z.number().optional(),
})
// 联合类型
z.union([z.string(), z.number()])
// 记录类型
z.record(z.string(), z.number()) // { [key: string]: number }

描述的重要性#

// ❌ 不好:缺少描述
server.registerTool(
'search',
{
inputSchema: {
q: z.string(),
n: z.number(),
},
},
async ({ q, n }) => {
/* ... */
}
)
// ✅ 好:详细描述
server.registerTool(
'search_documents',
{
title: '文档搜索',
description: '在知识库中搜索相关文档,返回最匹配的结果',
inputSchema: {
query: z.string().min(1).describe('搜索关键词,支持自然语言查询'),
limit: z
.number()
.int()
.min(1)
.max(50)
.default(10)
.describe('返回结果数量上限'),
},
},
async ({ query, limit }) => {
/* ... */
}
)

🔶 关键description 和每个参数的 .describe() 是 AI 理解 Tool 的主要依据。

结构化输出#

Tool 可以返回两种格式的结果:

文本输出(给 AI 看)#

return {
content: [{ type: 'text', text: '搜索结果:找到 5 个相关文档...' }],
}

结构化输出(程序可用)#

return {
content: [{ type: 'text', text: JSON.stringify(output) }],
structuredContent: {
count: 5,
documents: [
{ id: '1', title: '文档A', score: 0.95 },
// ...
],
},
}

返回图片#

import { readFileSync } from 'fs'
return {
content: [
{
type: 'image',
data: readFileSync('chart.png').toString('base64'),
mimeType: 'image/png',
},
],
}

返回资源链接#

当数据量大时,可以返回 ResourceLink 而非嵌入内容:

return {
content: [
{ type: 'text', text: '生成了 3 个文件' },
{
type: 'resource_link',
uri: 'file:///output/report.pdf',
name: '分析报告',
mimeType: 'application/pdf',
},
],
}

错误处理#

标记错误结果#

server.registerTool(
'divide',
{
/* schema */
},
async ({ a, b }) => {
if (b === 0) {
return {
content: [{ type: 'text', text: '错误:除数不能为零' }],
isError: true, // 告知 AI 这是错误
}
}
// ...
}
)

异常处理#

server.registerTool(
'fetch_data',
{
/* schema */
},
async ({ url }) => {
try {
const response = await fetch(url)
if (!response.ok) {
return {
content: [{ type: 'text', text: `HTTP 错误:${response.status}` }],
isError: true,
}
}
const data = await response.json()
return {
content: [{ type: 'text', text: JSON.stringify(data, null, 2) }],
structuredContent: data,
}
} catch (error) {
return {
content: [
{
type: 'text',
text: `请求失败:${error instanceof Error ? error.message : '未知错误'}`,
},
],
isError: true,
}
}
}
)

异步操作与长时间任务#

进度通知#

对于耗时任务,可以发送进度通知:

server.registerTool(
'process_files',
{
inputSchema: {
files: z.array(z.string()).describe('要处理的文件列表'),
},
},
async ({ files }, { progressToken }) => {
const results = []
for (let i = 0; i < files.length; i++) {
// 发送进度通知
if (progressToken) {
await server.server.notification({
method: 'notifications/progress',
params: {
progressToken,
progress: i,
total: files.length,
message: `处理中:${files[i]}`,
},
})
}
// 实际处理逻辑
const result = await processFile(files[i])
results.push(result)
}
return {
content: [{ type: 'text', text: `完成处理 ${files.length} 个文件` }],
structuredContent: { results },
}
}
)

并发控制#

import pLimit from 'p-limit'
const limit = pLimit(5) // 最多 5 个并发
server.registerTool(
'batch_process',
{
/* schema */
},
async ({ items }) => {
const results = await Promise.all(
items.map((item) => limit(() => processItem(item)))
)
return {
content: [{ type: 'text', text: `处理完成:${results.length}` }],
structuredContent: { results },
}
}
)

动态 Tool 管理#

注册 Tool 后会返回一个句柄,可以用来动态管理:

启用/禁用#

const writeTool = server.registerTool(
'write_file',
{
/* schema */
},
async ({ path, content }) => {
/* ... */
}
)
// 初始禁用(不会出现在 listTools 结果中)
writeTool.disable()
// 根据权限启用
async function onUserAuthenticated(permissions: string[]) {
if (permissions.includes('write')) {
writeTool.enable()
}
}

更新 Tool#

const searchTool = server.registerTool(
'search',
{
inputSchema: {
query: z.string(),
scope: z.enum(['local', 'global']),
},
},
async (args) => {
/* ... */
}
)
// 后续添加新的搜索范围
searchTool.update({
inputSchema: {
query: z.string(),
scope: z.enum(['local', 'global', 'archive']), // 新增 archive
},
})
// SDK 会自动发送 listChanged 通知

移除 Tool#

// 完全移除 Tool
searchTool.remove()

实际场景:权限升级#

const viewTool = server.registerTool(
'view',
{
/* ... */
},
viewHandler
)
const editTool = server.registerTool(
'edit',
{
/* ... */
},
editHandler
)
const adminTool = server.registerTool(
'admin',
{
/* ... */
},
adminHandler
)
// 初始只有查看权限
editTool.disable()
adminTool.disable()
server.registerTool(
'upgrade_permission',
{
inputSchema: {
level: z.enum(['editor', 'admin']),
},
},
async ({ level }) => {
const success = await requestPermissionUpgrade(level)
if (!success) {
return {
content: [{ type: 'text', text: '权限升级失败' }],
isError: true,
}
}
if (level === 'editor') {
editTool.enable()
} else if (level === 'admin') {
editTool.enable()
adminTool.enable()
}
return {
content: [{ type: 'text', text: `已升级到 ${level} 权限` }],
}
}
)

最佳实践#

1. 命名规范#

// ✅ 好:动词_名词,清晰表达行为
'search_documents'
'create_user'
'send_email'
'get_weather'
// ❌ 不好:含糊或过于通用
'search' // 搜索什么?
'do_thing' // 做什么事?
'handler' // 不是动词

2. 单一职责#

// ❌ 不好:一个 Tool 做太多事
server.registerTool(
'manage_files',
{
/* ... */
},
async ({ action, path, content }) => {
if (action === 'read') {
/* ... */
}
if (action === 'write') {
/* ... */
}
if (action === 'delete') {
/* ... */
}
}
)
// ✅ 好:拆分成独立的 Tools
server.registerTool(
'read_file',
{
/* ... */
},
readHandler
)
server.registerTool(
'write_file',
{
/* ... */
},
writeHandler
)
server.registerTool(
'delete_file',
{
/* ... */
},
deleteHandler
)

3. 参数验证#

// 使用 Zod 的 refine 进行复杂验证
server.registerTool(
'create_event',
{
inputSchema: {
title: z.string().min(1).max(100),
startTime: z.string().datetime(),
endTime: z.string().datetime(),
},
},
async ({ title, startTime, endTime }) => {
// 额外验证
if (new Date(endTime) <= new Date(startTime)) {
return {
content: [{ type: 'text', text: '结束时间必须晚于开始时间' }],
isError: true,
}
}
// ...
}
)

4. 幂等性设计#

对于写操作,考虑幂等性:

server.registerTool(
'ensure_directory',
{
description: '确保目录存在,如已存在则不做任何操作',
inputSchema: {
path: z.string().describe('目录路径'),
},
},
async ({ path }) => {
await fs.mkdir(path, { recursive: true })
return {
content: [{ type: 'text', text: `目录已就绪:${path}` }],
}
}
)

5. 日志记录#

server.registerTool(
'important_action',
{
/* ... */
},
async (args) => {
console.error(`[Tool] important_action called with:`, JSON.stringify(args))
try {
const result = await doImportantThing(args)
console.error(`[Tool] important_action succeeded`)
return { content: [{ type: 'text', text: 'Done' }] }
} catch (error) {
console.error(`[Tool] important_action failed:`, error)
throw error
}
}
)

完整示例:文件操作 Tools#

import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'
import { z } from 'zod'
import { readFile, writeFile, readdir, stat } from 'fs/promises'
import { join, resolve } from 'path'
const server = new McpServer({
name: 'file-tools',
version: '1.0.0',
})
// 安全的路径解析(防止路径遍历攻击)
const BASE_DIR = '/allowed/directory'
function safePath(userPath: string): string {
const resolved = resolve(BASE_DIR, userPath)
if (!resolved.startsWith(BASE_DIR)) {
throw new Error('不允许访问该路径')
}
return resolved
}
// 列出目录
server.registerTool(
'list_directory',
{
title: '列出目录',
description: '列出指定目录下的文件和子目录',
inputSchema: {
path: z.string().default('.').describe('相对于工作目录的路径'),
},
outputSchema: {
entries: z.array(
z.object({
name: z.string(),
type: z.enum(['file', 'directory']),
size: z.number().optional(),
})
),
},
},
async ({ path }) => {
try {
const fullPath = safePath(path)
const entries = await readdir(fullPath, { withFileTypes: true })
const result = await Promise.all(
entries.map(async (entry) => {
const entryPath = join(fullPath, entry.name)
const stats = await stat(entryPath)
return {
name: entry.name,
type: entry.isDirectory() ? 'directory' : 'file',
size: entry.isFile() ? stats.size : undefined,
}
})
)
return {
content: [
{
type: 'text',
text: result
.map((e) => `${e.type === 'directory' ? '📁' : '📄'} ${e.name}`)
.join('\n'),
},
],
structuredContent: { entries: result },
}
} catch (error) {
return {
content: [{ type: 'text', text: `错误:${error}` }],
isError: true,
}
}
}
)
// 读取文件
server.registerTool(
'read_file',
{
title: '读取文件',
description: '读取文本文件内容',
inputSchema: {
path: z.string().describe('文件路径'),
encoding: z.enum(['utf-8', 'ascii', 'base64']).default('utf-8'),
},
},
async ({ path, encoding }) => {
try {
const fullPath = safePath(path)
const content = await readFile(fullPath, encoding as BufferEncoding)
return {
content: [{ type: 'text', text: content }],
}
} catch (error) {
return {
content: [{ type: 'text', text: `读取失败:${error}` }],
isError: true,
}
}
}
)
// 写入文件
server.registerTool(
'write_file',
{
title: '写入文件',
description: '将内容写入文件,如文件存在则覆盖',
inputSchema: {
path: z.string().describe('文件路径'),
content: z.string().describe('要写入的内容'),
},
},
async ({ path, content }) => {
try {
const fullPath = safePath(path)
await writeFile(fullPath, content, 'utf-8')
return {
content: [{ type: 'text', text: `已写入:${path}` }],
}
} catch (error) {
return {
content: [{ type: 'text', text: `写入失败:${error}` }],
isError: true,
}
}
}
)
const transport = new StdioServerTransport()
await server.connect(transport)

小结#

这篇文章我们深入学习了 MCP Tool 的高级用法:

✅ Schema 设计原则与最佳实践 ✅ 多种输出格式:文本、结构化、图片、资源链接 ✅ 错误处理与异常捕获 ✅ 异步操作与进度通知 ✅ 动态 Tool 管理:启用、禁用、更新、移除 ✅ 命名规范与设计原则

下一篇我们将探索 MCP Resources,学习如何向 AI 暴露数据。

参考资料#