414 lines
13 KiB
Python
414 lines
13 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
基于 mcp_docx.py 封装的 MCP 服务器。
|
||
|
||
暴露两个主要 MCP 工具:
|
||
- list_docx_images:列出 DOCX 中的图片信息
|
||
- edit_docx: 进行文本替换 / 关键字上色 / 图片替换
|
||
|
||
额外提供 HTTP 文件接口(仅在 http 模式下可用):
|
||
- POST /upload: 上传文件到服务器
|
||
- GET /files/{filename}: 下载服务器上的文件
|
||
|
||
当前推荐的传输方式:
|
||
- stdio(本地调试)
|
||
- streamable-http(远程 HTTP,路径固定为 /mcp,推荐)
|
||
|
||
用法:
|
||
# 本地 stdio 模式(默认)
|
||
python mcp_docx_server.py --transport stdio
|
||
|
||
# HTTP 远程模式(推荐,默认 0.0.0.0:8080,对外暴露 /mcp)
|
||
python mcp_docx_server.py --transport http
|
||
python mcp_docx_server.py --transport http --host 0.0.0.0 --port 8080
|
||
|
||
# 客户端连接地址(http 模式):
|
||
# MCP 端点: http://<host>:<port>/mcp
|
||
|
||
注意:底层仍然完全复用 mcp_docx.py 中的逻辑,只是通过 MCP SDK 对外提供。
|
||
"""
|
||
|
||
import argparse
|
||
import os
|
||
import tempfile
|
||
import urllib.parse
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
import requests
|
||
from mcp.server.fastmcp import FastMCP
|
||
from mcp.server.transport_security import TransportSecuritySettings
|
||
|
||
from mcp_docx import get_images_info, process, _parse_span_replacement
|
||
|
||
_disable_dns_rebinding = os.getenv("MCP_DISABLE_HOST_CHECK") == "1"
|
||
|
||
if _disable_dns_rebinding:
|
||
# 参考 python-sdk 官方文档:关闭 DNS rebinding 防护(适合本地或已由外层网关做安全控制的环境)
|
||
# https://github.com/modelcontextprotocol/python-sdk/issues/1798
|
||
transport_security = TransportSecuritySettings(
|
||
enable_dns_rebinding_protection=False,
|
||
)
|
||
else:
|
||
# 默认:开启 DNS rebinding 防护,但允许本机访问
|
||
# 如需通过网关 / 域名访问,可在这里追加 allowed_hosts / allowed_origins
|
||
transport_security = TransportSecuritySettings(
|
||
enable_dns_rebinding_protection=True,
|
||
allowed_hosts=["localhost:*", "127.0.0.1:*","149.88.66.186:*"],
|
||
allowed_origins=["http://localhost:*", "http://127.0.0.1:*","http://149.88.66.186:*"],
|
||
)
|
||
|
||
|
||
mcp = FastMCP(
|
||
"docx-editor",
|
||
transport_security=transport_security,
|
||
)
|
||
|
||
# 全局变量:存储服务器配置
|
||
_server_config = {
|
||
"host": None,
|
||
"port": None,
|
||
"transport": None,
|
||
}
|
||
|
||
|
||
def _is_url(path: str) -> bool:
|
||
"""简单判断一个字符串是否为 HTTP/HTTPS URL。"""
|
||
return path.startswith("http://") or path.startswith("https://")
|
||
|
||
|
||
def _download_to_temp(url: str, suffix: str = ".tmp") -> str:
|
||
"""
|
||
将远程 URL 下载到临时文件,返回本地临时路径。
|
||
|
||
调用方负责在使用完毕后删除该文件。
|
||
"""
|
||
resp = requests.get(url, stream=True, timeout=30)
|
||
resp.raise_for_status()
|
||
|
||
fd, tmp_path = tempfile.mkstemp(suffix=suffix)
|
||
try:
|
||
with os.fdopen(fd, "wb") as f:
|
||
for chunk in resp.iter_content(chunk_size=8192):
|
||
if chunk:
|
||
f.write(chunk)
|
||
except Exception:
|
||
# 出错时清理临时文件
|
||
try:
|
||
os.remove(tmp_path)
|
||
except OSError:
|
||
pass
|
||
raise
|
||
|
||
return tmp_path
|
||
|
||
|
||
def _build_output_url(abs_output_path: str) -> Optional[str]:
|
||
"""
|
||
构造输出文件的下载 URL。
|
||
|
||
优先使用环境变量 MCP_OUTPUT_BASE_URL,否则根据服务器配置自动构建。
|
||
|
||
约定:
|
||
- 如果设置了 MCP_OUTPUT_BASE_URL: 使用该 URL 作为基础
|
||
- 否则在 http 模式下: http://host:port/download/{filename}
|
||
- stdio 模式下: 返回 None
|
||
"""
|
||
# 优先使用环境变量
|
||
base = os.getenv("MCP_OUTPUT_BASE_URL")
|
||
if base:
|
||
filename = os.path.basename(abs_output_path)
|
||
return base.rstrip("/") + "/" + filename
|
||
|
||
# 如果是 http 模式,自动构建下载 URL
|
||
if _server_config["transport"] == "http":
|
||
host = _server_config["host"]
|
||
port = _server_config["port"]
|
||
filename = os.path.basename(abs_output_path)
|
||
|
||
# 如果 host 是 0.0.0.0,尝试使用更具体的地址
|
||
if host == "0.0.0.0":
|
||
# 优先使用环境变量指定的公网地址
|
||
public_host = os.getenv("MCP_PUBLIC_HOST")
|
||
if public_host:
|
||
host = public_host
|
||
else:
|
||
# 默认使用 localhost
|
||
host = "localhost"
|
||
|
||
return f"http://{host}:{port}/download/{filename}"
|
||
|
||
return None
|
||
|
||
|
||
def _get_upload_dir() -> str:
|
||
"""
|
||
获取文件上传目录。
|
||
|
||
优先使用环境变量 MCP_UPLOAD_DIR,否则使用当前目录下的 uploads 文件夹。
|
||
"""
|
||
upload_dir = os.getenv("MCP_UPLOAD_DIR", "./uploads")
|
||
os.makedirs(upload_dir, exist_ok=True)
|
||
return os.path.abspath(upload_dir)
|
||
|
||
|
||
@mcp.tool()
|
||
async def list_docx_images(docx_url: str) -> List[Dict[str, Any]]:
|
||
"""
|
||
列出指定 DOCX 文件中的所有图片信息。
|
||
|
||
参数:
|
||
- docx_url: 文件的HTTP/HTTPS URL。
|
||
|
||
返回:
|
||
- 图片信息列表,每一项包含:
|
||
- index: 图片在文档中的顺序(从 1 开始)
|
||
- media_file: DOCX 内部的资源路径
|
||
- ext: 图片扩展名
|
||
- docpr_name: Word 内部的图片名称
|
||
- width_cm / height_cm: 近似尺寸(厘米),可能为 None
|
||
"""
|
||
imgs = get_images_info(_download_to_temp(docx_url, suffix=".docx"))
|
||
return imgs
|
||
|
||
@mcp.tool()
|
||
async def edit_docx(input_docx_path: str, replacements: Optional[List[Dict[str, str]]] = None, image_replacements: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]:
|
||
"""
|
||
使用原始 mcp_docx 逻辑对 DOCX 文件进行编辑。
|
||
|
||
支持:
|
||
- 纯文本替换
|
||
- 通过 <span color=\"FF0000\">关键字</span> 语法设置关键字颜色
|
||
- 替换指定序号的图片
|
||
|
||
参数:
|
||
- input_docx_path: 输入 DOCX 文件名称
|
||
- replacements: 文本替换规则列表,例如:
|
||
[
|
||
{\"old\": \"旧标题\", \"new\": \"<span color='#FF0000'>新标题</span>\"},
|
||
{\"old\": \"原文\", \"new\": \"新文\"}
|
||
]
|
||
- image_replacements: 图片替换规则列表,例如:
|
||
[
|
||
{\"index\": 1, \"file\": \"new_chart.png\"},
|
||
{\"index\": 2, \"file\": \"new_photo.jpg\"}
|
||
]
|
||
其中 file 字段同样可以是本地路径或 HTTP/HTTPS URL。
|
||
|
||
返回:
|
||
- {
|
||
\"output_path\": 生成的 DOCX 绝对路径,
|
||
\"output_url\": 如果配置了 MCP_OUTPUT_BASE_URL,则为可访问该文件的 URL,否则为 null
|
||
}
|
||
"""
|
||
tmp_input: Optional[str] = None
|
||
tmp_images: List[str] = []
|
||
print(f"edit_docx: input_docx_path: {input_docx_path}, replacements: {replacements}, image_replacements: {image_replacements}")
|
||
try:
|
||
upload_dir = _get_upload_dir()
|
||
local_input = os.path.join(upload_dir, input_docx_path)
|
||
if _is_url(input_docx_path):
|
||
parsed = urllib.parse.urlparse(input_docx_path)
|
||
ext = os.path.splitext(parsed.path)[1] or ".docx"
|
||
tmp_input = _download_to_temp(input_docx_path, suffix=ext)
|
||
local_input = tmp_input
|
||
|
||
if not os.path.exists(local_input):
|
||
raise FileNotFoundError(f"输入 DOCX 文件不存在: {input_docx_path}")
|
||
|
||
if replacements is None:
|
||
replacements = []
|
||
if image_replacements is None:
|
||
image_replacements = []
|
||
|
||
# 解析文本替换与颜色关键字(复用 CLI 逻辑)
|
||
rep_pairs = []
|
||
color_keywords = []
|
||
for item in replacements:
|
||
old = item.get("old")
|
||
new_raw = item.get("new")
|
||
if not old:
|
||
continue
|
||
if new_raw is None:
|
||
new_raw = ""
|
||
new_plain, spans = _parse_span_replacement(new_raw)
|
||
rep_pairs.append((old, new_plain))
|
||
color_keywords.extend(spans)
|
||
|
||
# 处理图片替换参数(支持本地路径或 URL)
|
||
img_pairs = []
|
||
for item in image_replacements:
|
||
try:
|
||
idx = int(item.get("index"))
|
||
except (TypeError, ValueError):
|
||
continue
|
||
|
||
path = item.get("file")
|
||
if not path:
|
||
continue
|
||
|
||
local_img = path
|
||
if _is_url(path):
|
||
parsed = urllib.parse.urlparse(path)
|
||
ext = os.path.splitext(parsed.path)[1] or ""
|
||
suffix = ext if ext else ".img"
|
||
tmp_img = _download_to_temp(path, suffix=suffix)
|
||
tmp_images.append(tmp_img)
|
||
local_img = tmp_img
|
||
|
||
if not os.path.exists(local_img):
|
||
raise FileNotFoundError(f"图片文件不存在: {path}")
|
||
|
||
img_pairs.append((idx, local_img))
|
||
|
||
# 复用原始处理函数
|
||
process(
|
||
input_docx=local_input,
|
||
output_docx=local_input,
|
||
replacements=rep_pairs,
|
||
image_replacements=img_pairs,
|
||
color_keywords=color_keywords,
|
||
)
|
||
|
||
abs_out = os.path.abspath(local_input)
|
||
return {
|
||
"output_path": abs_out,
|
||
"output_url": _build_output_url(abs_out),
|
||
}
|
||
finally:
|
||
if tmp_input and os.path.exists(tmp_input):
|
||
try:
|
||
os.remove(tmp_input)
|
||
except OSError:
|
||
pass
|
||
|
||
for p in tmp_images:
|
||
if os.path.exists(p):
|
||
try:
|
||
os.remove(p)
|
||
except OSError:
|
||
pass
|
||
# HTTP 远程模式:添加文件上传下载路由
|
||
from starlette.responses import FileResponse, JSONResponse
|
||
from starlette.requests import Request
|
||
@mcp.custom_route("/upload", methods=["POST"])
|
||
async def upload_handler(request: Request):
|
||
"""处理文件上传"""
|
||
try:
|
||
form = await request.form()
|
||
file = form.get("file")
|
||
|
||
if not file:
|
||
return JSONResponse({
|
||
"success": False,
|
||
"message": "未提供文件"
|
||
}, status_code=400)
|
||
|
||
upload_dir = _get_upload_dir()
|
||
filename = file.filename
|
||
|
||
# 安全检查:防止路径遍历攻击
|
||
filename = os.path.basename(filename)
|
||
file_path = os.path.join(upload_dir, filename)
|
||
|
||
# 如果文件已存在,添加序号
|
||
base, ext = os.path.splitext(filename)
|
||
counter = 1
|
||
while os.path.exists(file_path):
|
||
filename = f"{base}_{counter}{ext}"
|
||
file_path = os.path.join(upload_dir, filename)
|
||
counter += 1
|
||
|
||
# 保存文件
|
||
content = await file.read()
|
||
with open(file_path, "wb") as f:
|
||
f.write(content)
|
||
|
||
return JSONResponse({
|
||
"success": True,
|
||
"filename": filename,
|
||
"file_path": file_path,
|
||
"file_url": _build_output_url(file_path),
|
||
"size": len(content),
|
||
"message": f"文件上传成功: {filename}"
|
||
})
|
||
except Exception as e:
|
||
return JSONResponse({
|
||
"success": False,
|
||
"message": f"文件上传失败: {str(e)}"
|
||
}, status_code=500)
|
||
@mcp.custom_route("/download/{filename}", methods=["GET"])
|
||
async def download_handler(request: Request):
|
||
"""处理文件下载"""
|
||
try:
|
||
filename = request.path_params.get("filename")
|
||
upload_dir = _get_upload_dir()
|
||
|
||
# 安全检查:防止路径遍历攻击
|
||
filename = os.path.basename(filename)
|
||
file_path = os.path.join(upload_dir, filename)
|
||
|
||
if not os.path.exists(file_path):
|
||
return JSONResponse({
|
||
"success": False,
|
||
"message": f"文件不存在: {filename}"
|
||
}, status_code=404)
|
||
|
||
if not os.path.isfile(file_path):
|
||
return JSONResponse({
|
||
"success": False,
|
||
"message": f"不是文件: {filename}"
|
||
}, status_code=400)
|
||
|
||
return FileResponse(
|
||
file_path,
|
||
filename=filename,
|
||
media_type="application/octet-stream"
|
||
)
|
||
except Exception as e:
|
||
return JSONResponse({
|
||
"success": False,
|
||
"message": f"文件下载失败: {str(e)}"
|
||
}, status_code=500)
|
||
|
||
if __name__ == "__main__":
|
||
parser = argparse.ArgumentParser(description="DOCX MCP 服务器")
|
||
parser.add_argument(
|
||
"--transport",
|
||
choices=["stdio", "http"],
|
||
default="stdio",
|
||
help="传输方式:stdio(本地)或 http(远程 HTTP /streamable-http)",
|
||
)
|
||
parser.add_argument(
|
||
"--host",
|
||
default="0.0.0.0",
|
||
help="HTTP 模式监听地址(默认 0.0.0.0)",
|
||
)
|
||
parser.add_argument(
|
||
"--port",
|
||
type=int,
|
||
default=8080,
|
||
help="HTTP 模式监听端口(默认 8080)",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
if args.transport == "http":
|
||
# 保存服务器配置到全局变量
|
||
_server_config["host"] = args.host
|
||
_server_config["port"] = args.port
|
||
_server_config["transport"] = "http"
|
||
|
||
# 启动 MCP 服务器(会自动集成到 uvicorn)
|
||
mcp.settings.host = args.host
|
||
mcp.settings.port = args.port
|
||
|
||
# 将自定义路由注入到 MCP 服务器
|
||
print(f"🚀 MCP HTTP 服务器启动中 → http://{args.host}:{args.port}/mcp")
|
||
|
||
# 注意:FastMCP 使用 Starlette,我们需要扩展其路由
|
||
mcp.run(transport="streamable-http")
|
||
else:
|
||
# 本地 stdio 模式
|
||
_server_config["transport"] = "stdio"
|
||
print("🚀 MCP stdio 模式启动中(本地使用)")
|
||
mcp.run(transport="stdio")
|