# MCP 服务器开发与企业集成:从零构建生产级 AI 工具网关
> **技术栈:** Python / TypeScript · MCP SDK · SSE/Streamable HTTP · 企业安全体系
> **难度:** ⭐⭐⭐⭐
> **阅读时间:** 25 分钟
> **适合人群:** 后端工程师、AI 应用架构师、平台开发者
—
## 一、为什么 MCP 正在改变 AI 集成的游戏规则?
2024 年底,Anthropic 开源了 **Model Context Protocol (MCP)**,迅速成为 AI 工具集成的事实标准。截至 2026 年 5 月,MCP 已迭代至 2025-04 规范版本,支持 **Streamable HTTP** 传输层,被 OpenAI、Claude、VSCode 等主流平台原生采纳。
### 1.1 传统 AI 集成的痛点
“`mermaid
flowchart LR
A[AI Agent] –>|Function Calling| B[自定义工具函数]
B –> C[API A]
B –> D[API B]
B –> E[API C]
style A fill:#4a90d9,color:#fff
style B fill:#e67e22,color:#fff
“`
每个 Agent 都需要:
– 手写 Function Calling 定义(JSON Schema)
– 自行处理认证、限流、错误重试
– 为每个 LLM 平台重新实现一遍工具层
– 缺乏标准化的工具发现与生命周期管理
### 1.2 MCP 的核心设计哲学
MCP 采用 **客户端-服务器** 架构,让 LLM 应用通过标准协议发现和调用工具:
“`
┌──────────────┐ MCP Protocol ┌──────────────┐
│ MCP Client │ ◄────────────────────► │ MCP Server │
│ (Claude, IDE, │ (JSON-RPC 2.0) │ (你的服务) │
│ Agent 框架) │ │ │
└──────────────┘ └──────────────┘
“`
**三大核心能力:**
– **Tools** — 可执行的函数,LLM 自主调用
– **Resources** — 暴露数据资源(文件、数据库查询)
– **Prompts** — 可复用的提示模板
**关键优势:**
1. **一次开发,到处运行** — 一个 MCP 服务器同时服务 Claude、Cursor、VS Code、自定义 Agent
2. **动态工具发现** — 客户端自动获取工具列表和 Schema
3. **标准化传输** — stdio(进程内)、SSE、Streamable HTTP
4. **安全边界** — 服务器可控制暴露哪些资源和能力
—
## 二、MCP 核心协议深度解析
### 2.1 协议基础
MCP 基于 **JSON-RPC 2.0**,所有通信通过消息交换完成:
“`typescript
// 基础消息结构
interface JSONRPCRequest {
jsonrpc: “2.0”;
id: number | string;
method: string;
params?: Record;
}
interface JSONRPCResponse {
jsonrpc: “2.0”;
id: number | string;
result?: any;
error?: {
code: number;
message: string;
data?: any;
};
}
// 通知(无响应)
interface JSONRPCNotification {
jsonrpc: “2.0”;
method: string;
params?: Record;
}
“`
### 2.2 生命周期与方法清单
“`
Session Lifecycle:
1. Initialize (client → server)
– 协议版本协商
– 能力声明
2. Initialized (client → server) — 通知
3. 正常通信
4. Shutdown / 连接断开
“`
**核心方法一览:**
| 方向 | 方法 | 作用 |
|——|——|——|
| C→S | `initialize` | 握手协商 |
| C→S | `tools/list` | 获取可用工具列表 |
| C→S | `tools/call` | 调用指定工具 |
| C→S | `resources/list` | 列出可用资源 |
| C→S | `resources/read` | 读取资源内容 |
| C→S | `prompts/list` | 列出提示模板 |
| C→S | `prompts/get` | 获取具体提示 |
| S→C | `notifications/tools/list_changed` | 工具列表变更通知 |
| S→C | `logging/message` | 日志输出 |
### 2.3 传输层对比
| 传输方式 | 适合场景 | 优点 | 缺点 |
|———-|———|——|——|
| **stdio** | 本地开发、CLI 工具、VSCode 扩展 | 零配置、低延迟 | 绑定进程生命周期 |
| **SSE** | 远程服务、Web 集成 | 实时推送、传统兼容 | 需要事件流客户端 |
| **Streamable HTTP** | 现代应用(2025-04+) | 简化传输、标准 HTTP | 需要更成熟的规范实现 |
**Streamable HTTP (2025-04 新规范)** 将传输层简化为标准 HTTP POST:
– 客户端正常 POST 请求即可
– 需要流式推送时使用 SSE
– 大幅降低了客户端实现门槛
—
## 三、从零搭建一个生产级 MCP 服务器
我们以 **Python** 为例,构建一个企业级数据查询 MCP 服务器。
### 3.1 项目骨架
“`bash
mkdir mcp-enterprise-server
cd mcp-enterprise-server
python -m venv .venv
source .venv/bin/activate
pip install mcp httpx python-dotenv pydantic
“`
### 3.2 基础服务器实现
“`python
# server.py
import asyncio
import json
import logging
from typing import Any
from mcp.server import Server, NotificationOptions
from mcp.server.models import InitializationOptions
import mcp.server.stdio
import mcp.types as types
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(“mcp-enterprise”)
# 创建服务器实例
server = Server(“enterprise-gateway”)
# ─── 工具处理 ───────────────────────────────────────
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
“””动态注册所有可用工具”””
return [
types.Tool(
name=”query-database”,
description=”执行 SQL 查询并返回结构化结果。自动脱敏敏感字段。”,
inputSchema={
“type”: “object”,
“properties”: {
“sql”: {
“type”: “string”,
“description”: “SQL 查询语句”,
},
“limit”: {
“type”: “integer”,
“description”: “最大返回行数”,
“default”: 100,
“maximum”: 1000,
},
},
“required”: [“sql”],
},
),
types.Tool(
name=”search-documentation”,
description=”在企业知识库中搜索相关文档”,
inputSchema={
“type”: “object”,
“properties”: {
“query”: {
“type”: “string”,
“description”: “搜索关键词”,
},
“top_k”: {
“type”: “integer”,
“description”: “返回结果数量”,
“default”: 5,
},
},
“required”: [“query”],
},
),
types.Tool(
name=”call-rest-api”,
description=”调用内部 REST API,自动处理认证和重试”,
inputSchema={
“type”: “object”,
“properties”: {
“endpoint”: {
“type”: “string”,
“description”: “API 端点路径(相对 base_url)”,
},
“method”: {
“type”: “string”,
“enum”: [“GET”, “POST”, “PUT”, “DELETE”],
“description”: “HTTP 方法”,
},
“body”: {
“type”: “object”,
“description”: “请求体(可选)”,
},
},
“required”: [“endpoint”, “method”],
},
),
]
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict[str, Any] | None
) -> list[types.TextContent]:
“””工具调用入口——统一路由”””
logger.info(f”Tool called: {name} with args={arguments}”)
if name == “query-database”:
return await execute_database_query(arguments)
elif name == “search-documentation”:
return await search_knowledge_base(arguments)
elif name == “call-rest-api”:
return await proxy_rest_api(arguments)
else:
raise ValueError(f”未知工具: {name}”)
# ─── 工具实现 ────────────────────────────────────────
async def execute_database_query(args: dict) -> list[types.TextContent]:
“””数据库查询(此处为模拟)”””
sql = args.get(“sql”, “”)
limit = min(args.get(“limit”, 100), 1000)
# TODO: 接入真实数据库连接池
# 安全审计: 记录所有 SQL 查询
logger.info(f”[AUDIT] SQL query: {sql[:200]}”)
mock_result = {
“columns”: [“id”, “name”, “status”, “created_at”],
“rows”: [
[1, “示例记录A”, “active”, “2026-01-15”],
[2, “示例记录B”, “inactive”, “2026-02-20”],
],
“total”: 2,
“truncated”: False,
“duration_ms”: 12,
}
return [types.TextContent(
type=”text”,
text=json.dumps(mock_result, ensure_ascii=False, indent=2),
)]
async def search_knowledge_base(args: dict) -> list[types.TextContent]:
“””向量知识库搜索”””
query = args.get(“query”, “”)
top_k = min(args.get(“top_k”, 5), 20)
# TODO: 接入向量数据库
results = [
{“title”: “部署指南”, “score”: 0.95, “url”: “/docs/deploy”},
{“title”: “API 参考”, “score”: 0.89, “url”: “/docs/api”},
]
return [types.TextContent(
type=”text”,
text=json.dumps({
“query”: query,
“results”: results[:top_k],
“total_hits”: len(results),
}, ensure_ascii=False, indent=2),
)]
async def proxy_rest_api(args: dict) -> list[types.TextContent]:
“””代理 REST API 调用”””
endpoint = args[“endpoint”]
method = args[“method”]
body = args.get(“body”)
# 使用 httpx 进行调用
import httpx
async with httpx.AsyncClient(
base_url=”https://api.internal.example.com”,
timeout=30.0,
) as client:
try:
response = await client.request(
method=method,
url=endpoint,
json=body,
headers={
“Authorization”: “Bearer ${INTERNAL_API_KEY}”,
“X-Request-Id”: “mcp-” + str(asyncio.get_running_loop().time()),
},
)
response.raise_for_status()
data = response.json()
except httpx.HTTPError as e:
return [types.TextContent(
type=”text”,
text=json.dumps({
“error”: str(e),
“status_code”: getattr(e.response, “status_code”, None),
}),
)]
return [types.TextContent(
type=”text”,
text=json.dumps(data, ensure_ascii=False, indent=2),
)]
# ─── 资源处理(可选)─────────────────────────────────
@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
return [
types.Resource(
uri=”company://policies/acceptable-use”,
name=”可接受使用政策”,
description=”企业 AI 使用政策”,
mimeType=”text/markdown”,
),
]
@server.read_resource()
async def handle_read_resource(uri: str) -> str:
if uri == “company://policies/acceptable-use”:
return “# 可接受使用政策nnAI 工具仅可用于经授权的企业业务场景…”
raise ValueError(f”未知资源: {uri}”)
# ─── 启动入口 ────────────────────────────────────────
async def main():
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name=”enterprise-gateway”,
server_version=”1.0.0″,
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == “__main__”:
asyncio.run(main())
“`
### 3.3 使用 Streamable HTTP(2025-04 规范)
如果想用 HTTP 暴露服务,使用 **MCP 官方 HTTP 传输**:
“`python
from mcp.server.http import HTTPServerTransport
from starlette.applications import Starlette
from starlette.routing import Route
import uvicorn
# 将 MCP Server 绑定到 HTTP
http_transport = HTTPServerTransport(
server=server,
path=”/mcp”,
)
app = Starlette(routes=[
Route(“/mcp”, endpoint=http_transport.handle_request, methods=[“POST”]),
Route(“/health”, endpoint=lambda r: {“status”: “ok”}),
])
if __name__ == “__main__”:
uvicorn.run(app, host=”0.0.0.0″, port=8000)
“`
### 3.4 客户端连接示例
“`python
# client.py — 测试连接
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def main():
server_params = StdioServerParameters(
command=”python”,
args=[“server.py”],
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# 1. 初始化
await session.initialize()
# 2. 获取工具列表
tools = await session.list_tools()
print(f”可用工具: {[t.name for t in tools.tools]}”)
# 3. 调用工具
result = await session.call_tool(
“query-database”,
{“sql”: “SELECT * FROM users LIMIT 5″}
)
print(f”结果: {result.content[0].text}”)
asyncio.run(main())
“`
—
## 四、企业级集成架构
现实场景中,MCP 服务器不会直接暴露给外部。以下是一个经过生产验证的架构设计:
“`
┌────────────────────────────────────────────────────────┐
│ 客户端层 │
│ ┌────────┐ ┌────────┐ ┌────────┐ ┌─────────────┐ │
│ │ Claude │ │ Cursor │ │ VS Code│ │ 自定义 Agent │ │
│ └───┬────┘ └───┬────┘ └───┬────┘ └──────┬──────┘ │
└──────┼───────────┼───────────┼───────────────┼─────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌────────────────────────────────────────────────────────┐
│ MCP Gateway (1:N) │
│ ┌─────────────────────────────────────────────────┐ │
│ │ 认证层: JWT / OAuth2 / API Key │ │
│ │ 限流层: 令牌桶 + 用户级配额 │ │
│ │ 审计层: 所有请求/响应全量日志 │ │
│ │ 路由层: 按能力分发到不同 MCP Server │ │
│ └─────────────────────────────────────────────────┘ │
└──────┬─────────────────────────────────────┬───────────┘
│ │
▼ ▼
┌──────────────┐ ┌──────────────┐
│ MCP Server 1 │ │ MCP Server 2 │
│ (数据库服务) │ │ (搜索服务) │
│ │ │ │
│ ┌─────────┐ │ │ ┌─────────┐ │
│ │ PostgreSQL│ │ │ │Elasticsearch│ │
│ │ MySQL │ │ │ │向量库 │ │
│ │ 数据湖 │ │ │ └─────────┘ │
│ └─────────┘ │ └──────────────┘
└──────────────┘
“`
### 4.1 MCP Gateway 实现
“`python
# gateway.py — 企业级 MCP 网关
import json
import time
import hashlib
import asyncio
from dataclasses import dataclass, field
from typing import Dict, List, Optional
@dataclass
class RateLimitBucket:
tokens: float
last_refill: float
class MCPGateway:
“””MCP 网关——统一入口、认证、限流、审计”””
def __init__(self, config: dict):
self.config = config
self.rate_limiters: Dict[str, RateLimitBucket] = {}
self.audit_log: List[dict] = []
self.backend_servers: Dict[str, dict] = {
“database”: {
“type”: “stdio”,
“command”: “python”,
“args”: [“servers/database_server.py”],
},
“knowledge”: {
“type”: “http”,
“url”: “http://localhost:8001/mcp”,
},
“monitoring”: {
“type”: “http”,
“url”: “http://localhost:8002/mcp”,
},
}
def authenticate(self, headers: dict) -> Optional[str]:
“””认证——支持 API Key 和 JWT”””
api_key = headers.get(“X-API-Key”, “”)
auth_header = headers.get(“Authorization”, “”)
# API Key 校验
if api_key in self.config.get(“api_keys”, {}):
return self.config[“api_keys”][api_key]
# JWT 校验
if auth_header.startswith(“Bearer “):
token = auth_header[7:]
# TODO: 验证 JWT 签名
return “jwt-user”
return None
def check_rate_limit(self, user_id: str) -> bool:
“””令牌桶限流”””
now = time.time()
bucket = self.rate_limiters.get(user_id)
if not bucket:
self.rate_limiters[user_id] = RateLimitBucket(
tokens=self.config.get(“rate_limit”, 60),
last_refill=now,
)
return True
# 补充令牌
elapsed = now – bucket.last_refill
bucket.tokens += elapsed * (self.config.get(“rate_limit”, 60) / 60.0)
bucket.tokens = min(bucket.tokens, self.config.get(“rate_limit”, 60))
bucket.last_refill = now
if bucket.tokens 10000:
self.audit_log = self.audit_log[-5000:]
# 发送到外部审计系统
# asyncio.create_task(send_to_audit_system(entry))
async def route_and_call(
self, user: str, tool_name: str, arguments: dict
) -> dict:
“””路由到对应的后端 MCP 服务器”””
# 工具 → 后端映射
tool_to_server = {
“query-database”: “database”,
“search-documentation”: “knowledge”,
“call-rest-api”: “monitoring”,
“analyze-logs”: “monitoring”,
}
server_name = tool_to_server.get(tool_name)
if not server_name:
return {“error”: f”未知工具: {tool_name}”}
server_config = self.backend_servers[server_name]
if server_config[“type”] == “stdio”:
return await self._call_stdio_server(
server_config, tool_name, arguments
)
else:
return await self._call_http_server(
server_config, tool_name, arguments
)
“`
—
## 五、安全最佳实践
### 5.1 输入净化与 SQL 注入防护
“`python
import re
from typing import List
class SQLGuard:
“””SQL 安全守卫——防止 AI 生成恶意查询”””
# 禁止的 SQL 关键词
FORBIDDEN_PATTERNS = [
r”bDROPb”,
r”bTRUNCATEb”,
r”bDELETEb(?!s+FROMs+w+s+WHERE)”, # 允许有 WHERE 的 DELETE
r”bALTERb”,
r”bCREATEs+USERb”,
r”bGRANTb”,
r”bEXECb”,
r”bEXECUTEb”,
r”bINTOs+OUTFILEb”,
r”bLOADs+FILEb”,
r”;s*$”, # 禁止多语句
]
@classmethod
def validate_query(cls, sql: str) -> tuple[bool, str]:
“””返回 (是否通过, 错误信息)”””
sql_upper = sql.upper()
for pattern in cls.FORBIDDEN_PATTERNS:
if re.search(pattern, sql_upper):
return False, f”查询包含禁止操作: {pattern}”
if len(sql) > 5000:
return False, “查询长度超过限制(5000字符)”
return True, “”
@classmethod
def apply_readonly(cls, sql: str) -> str:
“””强制转为只读查询”””
sql = sql.strip().rstrip(“;”)
if not sql.upper().startswith(“SELECT”):
sql = f”SELECT * FROM ({sql}) AS _subquery_ LIMIT 0″
return sql
“`
### 5.2 敏感数据脱敏
“`python
import re
from typing import Any, Dict
class DataMasker:
“””自动脱敏工具”””
SENSITIVE_PATTERNS = {
“phone”: r”1[3-9]d{9}”,
“email”: r”b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+.[A-Za-z]{2,}b”,
“id_card”: r”d{18}|d{17}X”,
“credit_card”: r”d{4}[-s]?d{4}[-s]?d{4}[-s]?d{4}”,
}
@classmethod
def mask_value(cls, key: str, value: str) -> str:
“””根据字段名或内容进行脱敏”””
# 按字段名匹配
key_lower = key.lower()
if any(kw in key_lower for kw in [“password”, “secret”, “token”, “key”]):
return “******”
if “phone” in key_lower or “mobile” in key_lower:
return value[:3] + “****” + value[-4:] if len(value) > 7 else value
if “email” in key_lower:
parts = value.split(“@”)
return parts[0][:2] + “***@” + parts[1] if len(parts) == 2 else value
if “name” in key_lower and len(value) >= 2:
return value[0] + “**”
# 按内容模式匹配
for pattern_name, pattern in cls.SENSITIVE_PATTERNS.items():
if re.fullmatch(pattern, str(value)):
return value[:3] + “****” + value[-2:]
return value
@classmethod
def mask_dict(cls, data: Dict[str, Any]) -> Dict[str, Any]:
“””递归脱敏字典”””
result = {}
for key, value in data.items():
if isinstance(value, dict):
result[key] = cls.mask_dict(value)
elif isinstance(value, str):
result[key] = cls.mask_value(key, value)
else:
result[key] = value
return result
“`
### 5.3 访问控制列表 (ACL)
“`python
@dataclass
class AccessPolicy:
“””细粒度访问控制”””
user_roles: Dict[str, List[str]] = field(default_factory=lambda: {
“admin”: [“query-database”, “search-documentation”, “call-rest-api”, “manage-rules”],
“developer”: [“query-database”, “search-documentation”, “call-rest-api”],
“analyst”: [“query-database (readonly)”, “search-documentation”],
“viewer”: [“search-documentation”],
})
def can_call_tool(
self, user_id: str, tool_name: str, args: dict
) -> tuple[bool, str]:
“””检查用户是否有权限调用特定工具”””
# 获取用户角色(从外部系统)
user_role = “analyst” # TODO: 从用户系统获取
allowed_tools = self.user_roles.get(user_role, [])
# 精确匹配或通配匹配
for allowed in allowed_tools:
if allowed == tool_name or allowed.startswith(f”{tool_name} (“):
return True, “”
return False, f”角色 ‘{user_role}’ 无权限调用 ‘{tool_name}’”
“`
—
## 六、监控与可观测性
### 6.1 OpenTelemetry 集成
“`python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
# 全局 trace provider
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# OTLP 导出
otlp_exporter = OTLPSpanExporter(
endpoint=”http://otel-collector:4317″,
insecure=True,
)
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# 在 MCP 工具调用中添加 tracing
@server.call_tool()
async def handle_call_tool(name, arguments):
with tracer.start_as_current_span(f”mcp/tools/{name}”) as span:
span.set_attribute(“tool.name”, name)
span.set_attribute(“tool.args_count”, len(arguments or {}))
start = time.time()
result = await actual_handler(name, arguments)
duration = time.time() – start
span.set_attribute(“tool.duration_ms”, duration * 1000)
span.set_attribute(“tool.success”, “error” not in result)
return result
“`
### 6.2 健康检查与指标暴露
“`python
from prometheus_client import Counter, Histogram, Gauge, generate_latest
# 指标定义
TOOL_CALLS = Counter(
“mcp_tool_calls_total”,
“Total MCP tool calls”,
[“tool_name”, “status”],
)
TOOL_DURATION = Histogram(
“mcp_tool_duration_seconds”,
“MCP tool call duration”,
[“tool_name”],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
)
ACTIVE_CONNECTIONS = Gauge(
“mcp_active_connections”,
“Number of active MCP connections”,
)
# 在工具调用时记录
@server.call_tool()
async def handle_call_tool(name, arguments):
TOOL_CALLS.labels(tool_name=name, status=”started”).inc()
start = time.time()
try:
result = await actual_handler(name, arguments)
TOOL_CALLS.labels(tool_name=name, status=”success”).inc()
return result
except Exception as e:
TOOL_CALLS.labels(tool_name=name, status=”error”).inc()
raise
finally:
TOOL_DURATION.labels(tool_name=name).observe(time.time() – start)
# /metrics 端点
@app.route(“/metrics”)
async def metrics(request):
return Response(
content=generate_latest(),
media_type=”text/plain”,
)
“`
—
## 七、高级模式:多工具编排
### 7.1 复合工具
“`python
@server.call_tool()
async def handle_composite_tool(name, arguments):
“””复合工具——内部编排多个子工具”””
if name == “user-360-view”:
# 1. 查用户信息
user_data = await execute_database_query({
“sql”: f”SELECT * FROM users WHERE id = {arguments[‘user_id’]}”
})
user_json = json.loads(user_data[0].text)
# 2. 查相关文档
docs = await search_knowledge_base({
“query”: f”用户 {user_json[‘rows’][0][1]} 相关文档”,
“top_k”: 3,
})
# 3. 查操作日志
logs = await proxy_rest_api({
“endpoint”: f”/api/v1/audit-logs?user_id={arguments[‘user_id’]}”,
“method”: “GET”,
})
return [types.TextContent(
type=”text”,
text=json.dumps({
“user_profile”: user_json,
“related_docs”: json.loads(docs[0].text),
“recent_activity”: json.loads(logs[0].text),
“generated_at”: time.strftime(“%Y-%m-%dT%H:%M:%SZ”, time.gmtime()),
}, ensure_ascii=False, indent=2),
)]
“`
### 7.2 工具链与缓存
“`python
from functools import lru_cache
from datetime import datetime, timedelta
class ToolCache:
“””带 TTL 的分布式工具结果缓存”””
def __init__(self, redis_client=None):
self._memory_cache = {}
self._redis = redis_client
async def get_or_compute(
self, key: str, ttl: timedelta, compute_fn
):
# 缓存命中检查
cache_key = f”mcp:cache:{key}”
# 内存缓存(毫秒级)
if cache_key in self._memory_cache:
entry = self._memory_cache[cache_key]
if datetime.now() **下期预告:** 《OpenClaw + Spring Boot 自动化实战》—— 用 Spring Boot 构建企业级 MCP 服务器,深入 Java 生态的 MCP 集成模式。