Tool 是 MCP 最核心的能力——它让 AI 从”只能说”变成”能做事”。这篇文章将深入探讨 Tool 的设计与实现。
🎯 Tool 的本质#
Tool 就是一个带有 schema 描述的异步函数:
// 伪代码表示 Tool 的结构interface Tool { name: string // 唯一标识 description: string // 告诉 AI 这个 Tool 做什么 inputSchema: Schema // 参数定义 outputSchema?: Schema // 返回值定义 handler: (args) => Promise<Result>}AI 通过阅读 description 和 inputSchema 来决定何时调用、如何传参。
Schema 设计#
好的 schema 设计直接影响 AI 对 Tool 的理解和使用准确率。
基础类型#
import { z } from 'zod'
// 字符串z.string()z.string().min(1).max(100) // 长度限制z.string().email() // 邮箱格式z.string().url() // URL 格式
// 数字z.number()z.number().int() // 整数z.number().min(0).max(100) // 范围限制z.number().positive() // 正数
// 布尔z.boolean()
// 枚举z.enum(['low', 'medium', 'high'])
// 可选z.string().optional()z.string().default('default value')复杂类型#
// 数组z.array(z.string()) // string[]z.array(z.number()).min(1).max(10) // 限制数组长度
// 对象z.object({ name: z.string(), age: z.number().optional(),})
// 联合类型z.union([z.string(), z.number()])
// 记录类型z.record(z.string(), z.number()) // { [key: string]: number }描述的重要性#
// ❌ 不好:缺少描述server.registerTool( 'search', { inputSchema: { q: z.string(), n: z.number(), }, }, async ({ q, n }) => { /* ... */ })
// ✅ 好:详细描述server.registerTool( 'search_documents', { title: '文档搜索', description: '在知识库中搜索相关文档,返回最匹配的结果', inputSchema: { query: z.string().min(1).describe('搜索关键词,支持自然语言查询'), limit: z .number() .int() .min(1) .max(50) .default(10) .describe('返回结果数量上限'), }, }, async ({ query, limit }) => { /* ... */ })🔶 关键:description 和每个参数的 .describe() 是 AI 理解 Tool 的主要依据。
结构化输出#
Tool 可以返回两种格式的结果:
文本输出(给 AI 看)#
return { content: [{ type: 'text', text: '搜索结果:找到 5 个相关文档...' }],}结构化输出(程序可用)#
return { content: [{ type: 'text', text: JSON.stringify(output) }], structuredContent: { count: 5, documents: [ { id: '1', title: '文档A', score: 0.95 }, // ... ], },}返回图片#
import { readFileSync } from 'fs'
return { content: [ { type: 'image', data: readFileSync('chart.png').toString('base64'), mimeType: 'image/png', }, ],}返回资源链接#
当数据量大时,可以返回 ResourceLink 而非嵌入内容:
return { content: [ { type: 'text', text: '生成了 3 个文件' }, { type: 'resource_link', uri: 'file:///output/report.pdf', name: '分析报告', mimeType: 'application/pdf', }, ],}错误处理#
标记错误结果#
server.registerTool( 'divide', { /* schema */ }, async ({ a, b }) => { if (b === 0) { return { content: [{ type: 'text', text: '错误:除数不能为零' }], isError: true, // 告知 AI 这是错误 } } // ... })异常处理#
server.registerTool( 'fetch_data', { /* schema */ }, async ({ url }) => { try { const response = await fetch(url) if (!response.ok) { return { content: [{ type: 'text', text: `HTTP 错误:${response.status}` }], isError: true, } } const data = await response.json() return { content: [{ type: 'text', text: JSON.stringify(data, null, 2) }], structuredContent: data, } } catch (error) { return { content: [ { type: 'text', text: `请求失败:${error instanceof Error ? error.message : '未知错误'}`, }, ], isError: true, } } })异步操作与长时间任务#
进度通知#
对于耗时任务,可以发送进度通知:
server.registerTool( 'process_files', { inputSchema: { files: z.array(z.string()).describe('要处理的文件列表'), }, }, async ({ files }, { progressToken }) => { const results = []
for (let i = 0; i < files.length; i++) { // 发送进度通知 if (progressToken) { await server.server.notification({ method: 'notifications/progress', params: { progressToken, progress: i, total: files.length, message: `处理中:${files[i]}`, }, }) }
// 实际处理逻辑 const result = await processFile(files[i]) results.push(result) }
return { content: [{ type: 'text', text: `完成处理 ${files.length} 个文件` }], structuredContent: { results }, } })并发控制#
import pLimit from 'p-limit'
const limit = pLimit(5) // 最多 5 个并发
server.registerTool( 'batch_process', { /* schema */ }, async ({ items }) => { const results = await Promise.all( items.map((item) => limit(() => processItem(item))) ) return { content: [{ type: 'text', text: `处理完成:${results.length} 项` }], structuredContent: { results }, } })动态 Tool 管理#
注册 Tool 后会返回一个句柄,可以用来动态管理:
启用/禁用#
const writeTool = server.registerTool( 'write_file', { /* schema */ }, async ({ path, content }) => { /* ... */ })
// 初始禁用(不会出现在 listTools 结果中)writeTool.disable()
// 根据权限启用async function onUserAuthenticated(permissions: string[]) { if (permissions.includes('write')) { writeTool.enable() }}更新 Tool#
const searchTool = server.registerTool( 'search', { inputSchema: { query: z.string(), scope: z.enum(['local', 'global']), }, }, async (args) => { /* ... */ })
// 后续添加新的搜索范围searchTool.update({ inputSchema: { query: z.string(), scope: z.enum(['local', 'global', 'archive']), // 新增 archive },})// SDK 会自动发送 listChanged 通知移除 Tool#
// 完全移除 ToolsearchTool.remove()实际场景:权限升级#
const viewTool = server.registerTool( 'view', { /* ... */ }, viewHandler)const editTool = server.registerTool( 'edit', { /* ... */ }, editHandler)const adminTool = server.registerTool( 'admin', { /* ... */ }, adminHandler)
// 初始只有查看权限editTool.disable()adminTool.disable()
server.registerTool( 'upgrade_permission', { inputSchema: { level: z.enum(['editor', 'admin']), }, }, async ({ level }) => { const success = await requestPermissionUpgrade(level) if (!success) { return { content: [{ type: 'text', text: '权限升级失败' }], isError: true, } }
if (level === 'editor') { editTool.enable() } else if (level === 'admin') { editTool.enable() adminTool.enable() }
return { content: [{ type: 'text', text: `已升级到 ${level} 权限` }], } })最佳实践#
1. 命名规范#
// ✅ 好:动词_名词,清晰表达行为'search_documents''create_user''send_email''get_weather'
// ❌ 不好:含糊或过于通用'search' // 搜索什么?'do_thing' // 做什么事?'handler' // 不是动词2. 单一职责#
// ❌ 不好:一个 Tool 做太多事server.registerTool( 'manage_files', { /* ... */ }, async ({ action, path, content }) => { if (action === 'read') { /* ... */ } if (action === 'write') { /* ... */ } if (action === 'delete') { /* ... */ } })
// ✅ 好:拆分成独立的 Toolsserver.registerTool( 'read_file', { /* ... */ }, readHandler)server.registerTool( 'write_file', { /* ... */ }, writeHandler)server.registerTool( 'delete_file', { /* ... */ }, deleteHandler)3. 参数验证#
// 使用 Zod 的 refine 进行复杂验证server.registerTool( 'create_event', { inputSchema: { title: z.string().min(1).max(100), startTime: z.string().datetime(), endTime: z.string().datetime(), }, }, async ({ title, startTime, endTime }) => { // 额外验证 if (new Date(endTime) <= new Date(startTime)) { return { content: [{ type: 'text', text: '结束时间必须晚于开始时间' }], isError: true, } } // ... })4. 幂等性设计#
对于写操作,考虑幂等性:
server.registerTool( 'ensure_directory', { description: '确保目录存在,如已存在则不做任何操作', inputSchema: { path: z.string().describe('目录路径'), }, }, async ({ path }) => { await fs.mkdir(path, { recursive: true }) return { content: [{ type: 'text', text: `目录已就绪:${path}` }], } })5. 日志记录#
server.registerTool( 'important_action', { /* ... */ }, async (args) => { console.error(`[Tool] important_action called with:`, JSON.stringify(args))
try { const result = await doImportantThing(args) console.error(`[Tool] important_action succeeded`) return { content: [{ type: 'text', text: 'Done' }] } } catch (error) { console.error(`[Tool] important_action failed:`, error) throw error } })完整示例:文件操作 Tools#
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'import { z } from 'zod'import { readFile, writeFile, readdir, stat } from 'fs/promises'import { join, resolve } from 'path'
const server = new McpServer({ name: 'file-tools', version: '1.0.0',})
// 安全的路径解析(防止路径遍历攻击)const BASE_DIR = '/allowed/directory'
function safePath(userPath: string): string { const resolved = resolve(BASE_DIR, userPath) if (!resolved.startsWith(BASE_DIR)) { throw new Error('不允许访问该路径') } return resolved}
// 列出目录server.registerTool( 'list_directory', { title: '列出目录', description: '列出指定目录下的文件和子目录', inputSchema: { path: z.string().default('.').describe('相对于工作目录的路径'), }, outputSchema: { entries: z.array( z.object({ name: z.string(), type: z.enum(['file', 'directory']), size: z.number().optional(), }) ), }, }, async ({ path }) => { try { const fullPath = safePath(path) const entries = await readdir(fullPath, { withFileTypes: true })
const result = await Promise.all( entries.map(async (entry) => { const entryPath = join(fullPath, entry.name) const stats = await stat(entryPath) return { name: entry.name, type: entry.isDirectory() ? 'directory' : 'file', size: entry.isFile() ? stats.size : undefined, } }) )
return { content: [ { type: 'text', text: result .map((e) => `${e.type === 'directory' ? '📁' : '📄'} ${e.name}`) .join('\n'), }, ], structuredContent: { entries: result }, } } catch (error) { return { content: [{ type: 'text', text: `错误:${error}` }], isError: true, } } })
// 读取文件server.registerTool( 'read_file', { title: '读取文件', description: '读取文本文件内容', inputSchema: { path: z.string().describe('文件路径'), encoding: z.enum(['utf-8', 'ascii', 'base64']).default('utf-8'), }, }, async ({ path, encoding }) => { try { const fullPath = safePath(path) const content = await readFile(fullPath, encoding as BufferEncoding) return { content: [{ type: 'text', text: content }], } } catch (error) { return { content: [{ type: 'text', text: `读取失败:${error}` }], isError: true, } } })
// 写入文件server.registerTool( 'write_file', { title: '写入文件', description: '将内容写入文件,如文件存在则覆盖', inputSchema: { path: z.string().describe('文件路径'), content: z.string().describe('要写入的内容'), }, }, async ({ path, content }) => { try { const fullPath = safePath(path) await writeFile(fullPath, content, 'utf-8') return { content: [{ type: 'text', text: `已写入:${path}` }], } } catch (error) { return { content: [{ type: 'text', text: `写入失败:${error}` }], isError: true, } } })
const transport = new StdioServerTransport()await server.connect(transport)小结#
这篇文章我们深入学习了 MCP Tool 的高级用法:
✅ Schema 设计原则与最佳实践 ✅ 多种输出格式:文本、结构化、图片、资源链接 ✅ 错误处理与异常捕获 ✅ 异步操作与进度通知 ✅ 动态 Tool 管理:启用、禁用、更新、移除 ✅ 命名规范与设计原则
下一篇我们将探索 MCP Resources,学习如何向 AI 暴露数据。