标签: OpenClaw

  • MCP 服务器开发与企业集成:从零构建生产级 AI 工具网关

    # MCP 服务器开发与企业集成:从零构建生产级 AI 工具网关

    > **技术栈:** Python / TypeScript · MCP SDK · SSE/Streamable HTTP · 企业安全体系
    > **难度:** ⭐⭐⭐⭐
    > **阅读时间:** 25 分钟
    > **适合人群:** 后端工程师、AI 应用架构师、平台开发者

    ## 一、为什么 MCP 正在改变 AI 集成的游戏规则?

    2024 年底,Anthropic 开源了 **Model Context Protocol (MCP)**,迅速成为 AI 工具集成的事实标准。截至 2026 年 5 月,MCP 已迭代至 2025-04 规范版本,支持 **Streamable HTTP** 传输层,被 OpenAI、Claude、VSCode 等主流平台原生采纳。

    ### 1.1 传统 AI 集成的痛点

    “`mermaid
    flowchart LR
    A[AI Agent] –>|Function Calling| B[自定义工具函数]
    B –> C[API A]
    B –> D[API B]
    B –> E[API C]
    style A fill:#4a90d9,color:#fff
    style B fill:#e67e22,color:#fff
    “`

    每个 Agent 都需要:
    – 手写 Function Calling 定义(JSON Schema)
    – 自行处理认证、限流、错误重试
    – 为每个 LLM 平台重新实现一遍工具层
    – 缺乏标准化的工具发现与生命周期管理

    ### 1.2 MCP 的核心设计哲学

    MCP 采用 **客户端-服务器** 架构,让 LLM 应用通过标准协议发现和调用工具:

    “`
    ┌──────────────┐ MCP Protocol ┌──────────────┐
    │ MCP Client │ ◄────────────────────► │ MCP Server │
    │ (Claude, IDE, │ (JSON-RPC 2.0) │ (你的服务) │
    │ Agent 框架) │ │ │
    └──────────────┘ └──────────────┘
    “`

    **三大核心能力:**
    – **Tools** — 可执行的函数,LLM 自主调用
    – **Resources** — 暴露数据资源(文件、数据库查询)
    – **Prompts** — 可复用的提示模板

    **关键优势:**
    1. **一次开发,到处运行** — 一个 MCP 服务器同时服务 Claude、Cursor、VS Code、自定义 Agent
    2. **动态工具发现** — 客户端自动获取工具列表和 Schema
    3. **标准化传输** — stdio(进程内)、SSE、Streamable HTTP
    4. **安全边界** — 服务器可控制暴露哪些资源和能力

    ## 二、MCP 核心协议深度解析

    ### 2.1 协议基础

    MCP 基于 **JSON-RPC 2.0**,所有通信通过消息交换完成:

    “`typescript
    // 基础消息结构
    interface JSONRPCRequest {
    jsonrpc: “2.0”;
    id: number | string;
    method: string;
    params?: Record;
    }

    interface JSONRPCResponse {
    jsonrpc: “2.0”;
    id: number | string;
    result?: any;
    error?: {
    code: number;
    message: string;
    data?: any;
    };
    }

    // 通知(无响应)
    interface JSONRPCNotification {
    jsonrpc: “2.0”;
    method: string;
    params?: Record;
    }
    “`

    ### 2.2 生命周期与方法清单

    “`
    Session Lifecycle:
    1. Initialize (client → server)
    – 协议版本协商
    – 能力声明
    2. Initialized (client → server) — 通知
    3. 正常通信
    4. Shutdown / 连接断开
    “`

    **核心方法一览:**

    | 方向 | 方法 | 作用 |
    |——|——|——|
    | C→S | `initialize` | 握手协商 |
    | C→S | `tools/list` | 获取可用工具列表 |
    | C→S | `tools/call` | 调用指定工具 |
    | C→S | `resources/list` | 列出可用资源 |
    | C→S | `resources/read` | 读取资源内容 |
    | C→S | `prompts/list` | 列出提示模板 |
    | C→S | `prompts/get` | 获取具体提示 |
    | S→C | `notifications/tools/list_changed` | 工具列表变更通知 |
    | S→C | `logging/message` | 日志输出 |

    ### 2.3 传输层对比

    | 传输方式 | 适合场景 | 优点 | 缺点 |
    |———-|———|——|——|
    | **stdio** | 本地开发、CLI 工具、VSCode 扩展 | 零配置、低延迟 | 绑定进程生命周期 |
    | **SSE** | 远程服务、Web 集成 | 实时推送、传统兼容 | 需要事件流客户端 |
    | **Streamable HTTP** | 现代应用(2025-04+) | 简化传输、标准 HTTP | 需要更成熟的规范实现 |

    **Streamable HTTP (2025-04 新规范)** 将传输层简化为标准 HTTP POST:
    – 客户端正常 POST 请求即可
    – 需要流式推送时使用 SSE
    – 大幅降低了客户端实现门槛

    ## 三、从零搭建一个生产级 MCP 服务器

    我们以 **Python** 为例,构建一个企业级数据查询 MCP 服务器。

    ### 3.1 项目骨架

    “`bash
    mkdir mcp-enterprise-server
    cd mcp-enterprise-server
    python -m venv .venv
    source .venv/bin/activate
    pip install mcp httpx python-dotenv pydantic
    “`

    ### 3.2 基础服务器实现

    “`python
    # server.py
    import asyncio
    import json
    import logging
    from typing import Any
    from mcp.server import Server, NotificationOptions
    from mcp.server.models import InitializationOptions
    import mcp.server.stdio
    import mcp.types as types

    # 配置日志
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(“mcp-enterprise”)

    # 创建服务器实例
    server = Server(“enterprise-gateway”)

    # ─── 工具处理 ───────────────────────────────────────

    @server.list_tools()
    async def handle_list_tools() -> list[types.Tool]:
    “””动态注册所有可用工具”””
    return [
    types.Tool(
    name=”query-database”,
    description=”执行 SQL 查询并返回结构化结果。自动脱敏敏感字段。”,
    inputSchema={
    “type”: “object”,
    “properties”: {
    “sql”: {
    “type”: “string”,
    “description”: “SQL 查询语句”,
    },
    “limit”: {
    “type”: “integer”,
    “description”: “最大返回行数”,
    “default”: 100,
    “maximum”: 1000,
    },
    },
    “required”: [“sql”],
    },
    ),
    types.Tool(
    name=”search-documentation”,
    description=”在企业知识库中搜索相关文档”,
    inputSchema={
    “type”: “object”,
    “properties”: {
    “query”: {
    “type”: “string”,
    “description”: “搜索关键词”,
    },
    “top_k”: {
    “type”: “integer”,
    “description”: “返回结果数量”,
    “default”: 5,
    },
    },
    “required”: [“query”],
    },
    ),
    types.Tool(
    name=”call-rest-api”,
    description=”调用内部 REST API,自动处理认证和重试”,
    inputSchema={
    “type”: “object”,
    “properties”: {
    “endpoint”: {
    “type”: “string”,
    “description”: “API 端点路径(相对 base_url)”,
    },
    “method”: {
    “type”: “string”,
    “enum”: [“GET”, “POST”, “PUT”, “DELETE”],
    “description”: “HTTP 方法”,
    },
    “body”: {
    “type”: “object”,
    “description”: “请求体(可选)”,
    },
    },
    “required”: [“endpoint”, “method”],
    },
    ),
    ]

    @server.call_tool()
    async def handle_call_tool(
    name: str, arguments: dict[str, Any] | None
    ) -> list[types.TextContent]:
    “””工具调用入口——统一路由”””
    logger.info(f”Tool called: {name} with args={arguments}”)

    if name == “query-database”:
    return await execute_database_query(arguments)
    elif name == “search-documentation”:
    return await search_knowledge_base(arguments)
    elif name == “call-rest-api”:
    return await proxy_rest_api(arguments)
    else:
    raise ValueError(f”未知工具: {name}”)

    # ─── 工具实现 ────────────────────────────────────────

    async def execute_database_query(args: dict) -> list[types.TextContent]:
    “””数据库查询(此处为模拟)”””
    sql = args.get(“sql”, “”)
    limit = min(args.get(“limit”, 100), 1000)

    # TODO: 接入真实数据库连接池
    # 安全审计: 记录所有 SQL 查询
    logger.info(f”[AUDIT] SQL query: {sql[:200]}”)

    mock_result = {
    “columns”: [“id”, “name”, “status”, “created_at”],
    “rows”: [
    [1, “示例记录A”, “active”, “2026-01-15”],
    [2, “示例记录B”, “inactive”, “2026-02-20”],
    ],
    “total”: 2,
    “truncated”: False,
    “duration_ms”: 12,
    }

    return [types.TextContent(
    type=”text”,
    text=json.dumps(mock_result, ensure_ascii=False, indent=2),
    )]

    async def search_knowledge_base(args: dict) -> list[types.TextContent]:
    “””向量知识库搜索”””
    query = args.get(“query”, “”)
    top_k = min(args.get(“top_k”, 5), 20)

    # TODO: 接入向量数据库
    results = [
    {“title”: “部署指南”, “score”: 0.95, “url”: “/docs/deploy”},
    {“title”: “API 参考”, “score”: 0.89, “url”: “/docs/api”},
    ]

    return [types.TextContent(
    type=”text”,
    text=json.dumps({
    “query”: query,
    “results”: results[:top_k],
    “total_hits”: len(results),
    }, ensure_ascii=False, indent=2),
    )]

    async def proxy_rest_api(args: dict) -> list[types.TextContent]:
    “””代理 REST API 调用”””
    endpoint = args[“endpoint”]
    method = args[“method”]
    body = args.get(“body”)

    # 使用 httpx 进行调用
    import httpx

    async with httpx.AsyncClient(
    base_url=”https://api.internal.example.com”,
    timeout=30.0,
    ) as client:
    try:
    response = await client.request(
    method=method,
    url=endpoint,
    json=body,
    headers={
    “Authorization”: “Bearer ${INTERNAL_API_KEY}”,
    “X-Request-Id”: “mcp-” + str(asyncio.get_running_loop().time()),
    },
    )
    response.raise_for_status()
    data = response.json()
    except httpx.HTTPError as e:
    return [types.TextContent(
    type=”text”,
    text=json.dumps({
    “error”: str(e),
    “status_code”: getattr(e.response, “status_code”, None),
    }),
    )]

    return [types.TextContent(
    type=”text”,
    text=json.dumps(data, ensure_ascii=False, indent=2),
    )]

    # ─── 资源处理(可选)─────────────────────────────────

    @server.list_resources()
    async def handle_list_resources() -> list[types.Resource]:
    return [
    types.Resource(
    uri=”company://policies/acceptable-use”,
    name=”可接受使用政策”,
    description=”企业 AI 使用政策”,
    mimeType=”text/markdown”,
    ),
    ]

    @server.read_resource()
    async def handle_read_resource(uri: str) -> str:
    if uri == “company://policies/acceptable-use”:
    return “# 可接受使用政策nnAI 工具仅可用于经授权的企业业务场景…”
    raise ValueError(f”未知资源: {uri}”)

    # ─── 启动入口 ────────────────────────────────────────

    async def main():
    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
    await server.run(
    read_stream,
    write_stream,
    InitializationOptions(
    server_name=”enterprise-gateway”,
    server_version=”1.0.0″,
    capabilities=server.get_capabilities(
    notification_options=NotificationOptions(),
    experimental_capabilities={},
    ),
    ),
    )

    if __name__ == “__main__”:
    asyncio.run(main())
    “`

    ### 3.3 使用 Streamable HTTP(2025-04 规范)

    如果想用 HTTP 暴露服务,使用 **MCP 官方 HTTP 传输**:

    “`python
    from mcp.server.http import HTTPServerTransport
    from starlette.applications import Starlette
    from starlette.routing import Route
    import uvicorn

    # 将 MCP Server 绑定到 HTTP
    http_transport = HTTPServerTransport(
    server=server,
    path=”/mcp”,
    )

    app = Starlette(routes=[
    Route(“/mcp”, endpoint=http_transport.handle_request, methods=[“POST”]),
    Route(“/health”, endpoint=lambda r: {“status”: “ok”}),
    ])

    if __name__ == “__main__”:
    uvicorn.run(app, host=”0.0.0.0″, port=8000)
    “`

    ### 3.4 客户端连接示例

    “`python
    # client.py — 测试连接
    import asyncio
    from mcp import ClientSession, StdioServerParameters
    from mcp.client.stdio import stdio_client

    async def main():
    server_params = StdioServerParameters(
    command=”python”,
    args=[“server.py”],
    )

    async with stdio_client(server_params) as (read, write):
    async with ClientSession(read, write) as session:
    # 1. 初始化
    await session.initialize()

    # 2. 获取工具列表
    tools = await session.list_tools()
    print(f”可用工具: {[t.name for t in tools.tools]}”)

    # 3. 调用工具
    result = await session.call_tool(
    “query-database”,
    {“sql”: “SELECT * FROM users LIMIT 5″}
    )
    print(f”结果: {result.content[0].text}”)

    asyncio.run(main())
    “`

    ## 四、企业级集成架构

    现实场景中,MCP 服务器不会直接暴露给外部。以下是一个经过生产验证的架构设计:

    “`
    ┌────────────────────────────────────────────────────────┐
    │ 客户端层 │
    │ ┌────────┐ ┌────────┐ ┌────────┐ ┌─────────────┐ │
    │ │ Claude │ │ Cursor │ │ VS Code│ │ 自定义 Agent │ │
    │ └───┬────┘ └───┬────┘ └───┬────┘ └──────┬──────┘ │
    └──────┼───────────┼───────────┼───────────────┼─────────┘
    │ │ │ │
    ▼ ▼ ▼ ▼
    ┌────────────────────────────────────────────────────────┐
    │ MCP Gateway (1:N) │
    │ ┌─────────────────────────────────────────────────┐ │
    │ │ 认证层: JWT / OAuth2 / API Key │ │
    │ │ 限流层: 令牌桶 + 用户级配额 │ │
    │ │ 审计层: 所有请求/响应全量日志 │ │
    │ │ 路由层: 按能力分发到不同 MCP Server │ │
    │ └─────────────────────────────────────────────────┘ │
    └──────┬─────────────────────────────────────┬───────────┘
    │ │
    ▼ ▼
    ┌──────────────┐ ┌──────────────┐
    │ MCP Server 1 │ │ MCP Server 2 │
    │ (数据库服务) │ │ (搜索服务) │
    │ │ │ │
    │ ┌─────────┐ │ │ ┌─────────┐ │
    │ │ PostgreSQL│ │ │ │Elasticsearch│ │
    │ │ MySQL │ │ │ │向量库 │ │
    │ │ 数据湖 │ │ │ └─────────┘ │
    │ └─────────┘ │ └──────────────┘
    └──────────────┘
    “`

    ### 4.1 MCP Gateway 实现

    “`python
    # gateway.py — 企业级 MCP 网关
    import json
    import time
    import hashlib
    import asyncio
    from dataclasses import dataclass, field
    from typing import Dict, List, Optional

    @dataclass
    class RateLimitBucket:
    tokens: float
    last_refill: float

    class MCPGateway:
    “””MCP 网关——统一入口、认证、限流、审计”””

    def __init__(self, config: dict):
    self.config = config
    self.rate_limiters: Dict[str, RateLimitBucket] = {}
    self.audit_log: List[dict] = []
    self.backend_servers: Dict[str, dict] = {
    “database”: {
    “type”: “stdio”,
    “command”: “python”,
    “args”: [“servers/database_server.py”],
    },
    “knowledge”: {
    “type”: “http”,
    “url”: “http://localhost:8001/mcp”,
    },
    “monitoring”: {
    “type”: “http”,
    “url”: “http://localhost:8002/mcp”,
    },
    }

    def authenticate(self, headers: dict) -> Optional[str]:
    “””认证——支持 API Key 和 JWT”””
    api_key = headers.get(“X-API-Key”, “”)
    auth_header = headers.get(“Authorization”, “”)

    # API Key 校验
    if api_key in self.config.get(“api_keys”, {}):
    return self.config[“api_keys”][api_key]

    # JWT 校验
    if auth_header.startswith(“Bearer “):
    token = auth_header[7:]
    # TODO: 验证 JWT 签名
    return “jwt-user”

    return None

    def check_rate_limit(self, user_id: str) -> bool:
    “””令牌桶限流”””
    now = time.time()
    bucket = self.rate_limiters.get(user_id)

    if not bucket:
    self.rate_limiters[user_id] = RateLimitBucket(
    tokens=self.config.get(“rate_limit”, 60),
    last_refill=now,
    )
    return True

    # 补充令牌
    elapsed = now – bucket.last_refill
    bucket.tokens += elapsed * (self.config.get(“rate_limit”, 60) / 60.0)
    bucket.tokens = min(bucket.tokens, self.config.get(“rate_limit”, 60))
    bucket.last_refill = now

    if bucket.tokens 10000:
    self.audit_log = self.audit_log[-5000:]

    # 发送到外部审计系统
    # asyncio.create_task(send_to_audit_system(entry))

    async def route_and_call(
    self, user: str, tool_name: str, arguments: dict
    ) -> dict:
    “””路由到对应的后端 MCP 服务器”””
    # 工具 → 后端映射
    tool_to_server = {
    “query-database”: “database”,
    “search-documentation”: “knowledge”,
    “call-rest-api”: “monitoring”,
    “analyze-logs”: “monitoring”,
    }

    server_name = tool_to_server.get(tool_name)
    if not server_name:
    return {“error”: f”未知工具: {tool_name}”}

    server_config = self.backend_servers[server_name]

    if server_config[“type”] == “stdio”:
    return await self._call_stdio_server(
    server_config, tool_name, arguments
    )
    else:
    return await self._call_http_server(
    server_config, tool_name, arguments
    )
    “`

    ## 五、安全最佳实践

    ### 5.1 输入净化与 SQL 注入防护

    “`python
    import re
    from typing import List

    class SQLGuard:
    “””SQL 安全守卫——防止 AI 生成恶意查询”””

    # 禁止的 SQL 关键词
    FORBIDDEN_PATTERNS = [
    r”bDROPb”,
    r”bTRUNCATEb”,
    r”bDELETEb(?!s+FROMs+w+s+WHERE)”, # 允许有 WHERE 的 DELETE
    r”bALTERb”,
    r”bCREATEs+USERb”,
    r”bGRANTb”,
    r”bEXECb”,
    r”bEXECUTEb”,
    r”bINTOs+OUTFILEb”,
    r”bLOADs+FILEb”,
    r”;s*$”, # 禁止多语句
    ]

    @classmethod
    def validate_query(cls, sql: str) -> tuple[bool, str]:
    “””返回 (是否通过, 错误信息)”””
    sql_upper = sql.upper()

    for pattern in cls.FORBIDDEN_PATTERNS:
    if re.search(pattern, sql_upper):
    return False, f”查询包含禁止操作: {pattern}”

    if len(sql) > 5000:
    return False, “查询长度超过限制(5000字符)”

    return True, “”

    @classmethod
    def apply_readonly(cls, sql: str) -> str:
    “””强制转为只读查询”””
    sql = sql.strip().rstrip(“;”)
    if not sql.upper().startswith(“SELECT”):
    sql = f”SELECT * FROM ({sql}) AS _subquery_ LIMIT 0″
    return sql
    “`

    ### 5.2 敏感数据脱敏

    “`python
    import re
    from typing import Any, Dict

    class DataMasker:
    “””自动脱敏工具”””

    SENSITIVE_PATTERNS = {
    “phone”: r”1[3-9]d{9}”,
    “email”: r”b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+.[A-Za-z]{2,}b”,
    “id_card”: r”d{18}|d{17}X”,
    “credit_card”: r”d{4}[-s]?d{4}[-s]?d{4}[-s]?d{4}”,
    }

    @classmethod
    def mask_value(cls, key: str, value: str) -> str:
    “””根据字段名或内容进行脱敏”””
    # 按字段名匹配
    key_lower = key.lower()
    if any(kw in key_lower for kw in [“password”, “secret”, “token”, “key”]):
    return “******”
    if “phone” in key_lower or “mobile” in key_lower:
    return value[:3] + “****” + value[-4:] if len(value) > 7 else value
    if “email” in key_lower:
    parts = value.split(“@”)
    return parts[0][:2] + “***@” + parts[1] if len(parts) == 2 else value
    if “name” in key_lower and len(value) >= 2:
    return value[0] + “**”

    # 按内容模式匹配
    for pattern_name, pattern in cls.SENSITIVE_PATTERNS.items():
    if re.fullmatch(pattern, str(value)):
    return value[:3] + “****” + value[-2:]

    return value

    @classmethod
    def mask_dict(cls, data: Dict[str, Any]) -> Dict[str, Any]:
    “””递归脱敏字典”””
    result = {}
    for key, value in data.items():
    if isinstance(value, dict):
    result[key] = cls.mask_dict(value)
    elif isinstance(value, str):
    result[key] = cls.mask_value(key, value)
    else:
    result[key] = value
    return result
    “`

    ### 5.3 访问控制列表 (ACL)

    “`python
    @dataclass
    class AccessPolicy:
    “””细粒度访问控制”””

    user_roles: Dict[str, List[str]] = field(default_factory=lambda: {
    “admin”: [“query-database”, “search-documentation”, “call-rest-api”, “manage-rules”],
    “developer”: [“query-database”, “search-documentation”, “call-rest-api”],
    “analyst”: [“query-database (readonly)”, “search-documentation”],
    “viewer”: [“search-documentation”],
    })

    def can_call_tool(
    self, user_id: str, tool_name: str, args: dict
    ) -> tuple[bool, str]:
    “””检查用户是否有权限调用特定工具”””
    # 获取用户角色(从外部系统)
    user_role = “analyst” # TODO: 从用户系统获取
    allowed_tools = self.user_roles.get(user_role, [])

    # 精确匹配或通配匹配
    for allowed in allowed_tools:
    if allowed == tool_name or allowed.startswith(f”{tool_name} (“):
    return True, “”

    return False, f”角色 ‘{user_role}’ 无权限调用 ‘{tool_name}’”
    “`

    ## 六、监控与可观测性

    ### 6.1 OpenTelemetry 集成

    “`python
    from opentelemetry import trace
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.sdk.trace.export import BatchSpanProcessor
    from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

    # 全局 trace provider
    trace.set_tracer_provider(TracerProvider())
    tracer = trace.get_tracer(__name__)

    # OTLP 导出
    otlp_exporter = OTLPSpanExporter(
    endpoint=”http://otel-collector:4317″,
    insecure=True,
    )
    span_processor = BatchSpanProcessor(otlp_exporter)
    trace.get_tracer_provider().add_span_processor(span_processor)

    # 在 MCP 工具调用中添加 tracing
    @server.call_tool()
    async def handle_call_tool(name, arguments):
    with tracer.start_as_current_span(f”mcp/tools/{name}”) as span:
    span.set_attribute(“tool.name”, name)
    span.set_attribute(“tool.args_count”, len(arguments or {}))

    start = time.time()
    result = await actual_handler(name, arguments)
    duration = time.time() – start

    span.set_attribute(“tool.duration_ms”, duration * 1000)
    span.set_attribute(“tool.success”, “error” not in result)

    return result
    “`

    ### 6.2 健康检查与指标暴露

    “`python
    from prometheus_client import Counter, Histogram, Gauge, generate_latest

    # 指标定义
    TOOL_CALLS = Counter(
    “mcp_tool_calls_total”,
    “Total MCP tool calls”,
    [“tool_name”, “status”],
    )

    TOOL_DURATION = Histogram(
    “mcp_tool_duration_seconds”,
    “MCP tool call duration”,
    [“tool_name”],
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
    )

    ACTIVE_CONNECTIONS = Gauge(
    “mcp_active_connections”,
    “Number of active MCP connections”,
    )

    # 在工具调用时记录
    @server.call_tool()
    async def handle_call_tool(name, arguments):
    TOOL_CALLS.labels(tool_name=name, status=”started”).inc()
    start = time.time()

    try:
    result = await actual_handler(name, arguments)
    TOOL_CALLS.labels(tool_name=name, status=”success”).inc()
    return result
    except Exception as e:
    TOOL_CALLS.labels(tool_name=name, status=”error”).inc()
    raise
    finally:
    TOOL_DURATION.labels(tool_name=name).observe(time.time() – start)

    # /metrics 端点
    @app.route(“/metrics”)
    async def metrics(request):
    return Response(
    content=generate_latest(),
    media_type=”text/plain”,
    )
    “`

    ## 七、高级模式:多工具编排

    ### 7.1 复合工具

    “`python
    @server.call_tool()
    async def handle_composite_tool(name, arguments):
    “””复合工具——内部编排多个子工具”””
    if name == “user-360-view”:
    # 1. 查用户信息
    user_data = await execute_database_query({
    “sql”: f”SELECT * FROM users WHERE id = {arguments[‘user_id’]}”
    })
    user_json = json.loads(user_data[0].text)

    # 2. 查相关文档
    docs = await search_knowledge_base({
    “query”: f”用户 {user_json[‘rows’][0][1]} 相关文档”,
    “top_k”: 3,
    })

    # 3. 查操作日志
    logs = await proxy_rest_api({
    “endpoint”: f”/api/v1/audit-logs?user_id={arguments[‘user_id’]}”,
    “method”: “GET”,
    })

    return [types.TextContent(
    type=”text”,
    text=json.dumps({
    “user_profile”: user_json,
    “related_docs”: json.loads(docs[0].text),
    “recent_activity”: json.loads(logs[0].text),
    “generated_at”: time.strftime(“%Y-%m-%dT%H:%M:%SZ”, time.gmtime()),
    }, ensure_ascii=False, indent=2),
    )]
    “`

    ### 7.2 工具链与缓存

    “`python
    from functools import lru_cache
    from datetime import datetime, timedelta

    class ToolCache:
    “””带 TTL 的分布式工具结果缓存”””

    def __init__(self, redis_client=None):
    self._memory_cache = {}
    self._redis = redis_client

    async def get_or_compute(
    self, key: str, ttl: timedelta, compute_fn
    ):
    # 缓存命中检查
    cache_key = f”mcp:cache:{key}”

    # 内存缓存(毫秒级)
    if cache_key in self._memory_cache:
    entry = self._memory_cache[cache_key]
    if datetime.now() **下期预告:** 《OpenClaw + Spring Boot 自动化实战》—— 用 Spring Boot 构建企业级 MCP 服务器,深入 Java 生态的 MCP 集成模式。