Files
mcp/mcp_docx_server.py
2026-02-27 16:11:28 +08:00

413 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
基于 mcp_docx.py 封装的 MCP 服务器。
暴露两个主要 MCP 工具:
- list_docx_images列出 DOCX 中的图片信息
- edit_docx: 进行文本替换 / 关键字上色 / 图片替换
额外提供 HTTP 文件接口(仅在 http 模式下可用):
- POST /upload: 上传文件到服务器
- GET /files/{filename}: 下载服务器上的文件
当前推荐的传输方式:
- stdio本地调试
- streamable-http远程 HTTP路径固定为 /mcp推荐
用法:
# 本地 stdio 模式(默认)
python mcp_docx_server.py --transport stdio
# HTTP 远程模式(推荐,默认 0.0.0.0:8080对外暴露 /mcp
python mcp_docx_server.py --transport http
python mcp_docx_server.py --transport http --host 0.0.0.0 --port 8080
# 客户端连接地址http 模式):
# MCP 端点: http://<host>:<port>/mcp
注意:底层仍然完全复用 mcp_docx.py 中的逻辑,只是通过 MCP SDK 对外提供。
"""
import argparse
import os
import tempfile
import urllib.parse
from typing import Any, Dict, List, Optional
import requests
from mcp.server.fastmcp import FastMCP
from mcp.server.transport_security import TransportSecuritySettings
from mcp_docx import get_images_info, process, _parse_span_replacement
_disable_dns_rebinding = os.getenv("MCP_DISABLE_HOST_CHECK") == "1"
if _disable_dns_rebinding:
# 参考 python-sdk 官方文档:关闭 DNS rebinding 防护(适合本地或已由外层网关做安全控制的环境)
# https://github.com/modelcontextprotocol/python-sdk/issues/1798
transport_security = TransportSecuritySettings(
enable_dns_rebinding_protection=False,
)
else:
# 默认:开启 DNS rebinding 防护,但允许本机访问
# 如需通过网关 / 域名访问,可在这里追加 allowed_hosts / allowed_origins
transport_security = TransportSecuritySettings(
enable_dns_rebinding_protection=True,
allowed_hosts=["localhost:*", "127.0.0.1:*","149.88.66.186:*"],
allowed_origins=["http://localhost:*", "http://127.0.0.1:*","http://149.88.66.186:*"],
)
mcp = FastMCP(
"docx-editor",
transport_security=transport_security,
)
# 全局变量:存储服务器配置
_server_config = {
"host": None,
"port": None,
"transport": None,
}
def _is_url(path: str) -> bool:
"""简单判断一个字符串是否为 HTTP/HTTPS URL。"""
return path.startswith("http://") or path.startswith("https://")
def _download_to_temp(url: str, suffix: str = ".tmp") -> str:
"""
将远程 URL 下载到临时文件,返回本地临时路径。
调用方负责在使用完毕后删除该文件。
"""
resp = requests.get(url, stream=True, timeout=30)
resp.raise_for_status()
fd, tmp_path = tempfile.mkstemp(suffix=suffix)
try:
with os.fdopen(fd, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
except Exception:
# 出错时清理临时文件
try:
os.remove(tmp_path)
except OSError:
pass
raise
return tmp_path
def _build_output_url(abs_output_path: str) -> Optional[str]:
"""
构造输出文件的下载 URL。
优先使用环境变量 MCP_OUTPUT_BASE_URL否则根据服务器配置自动构建。
约定:
- 如果设置了 MCP_OUTPUT_BASE_URL: 使用该 URL 作为基础
- 否则在 http 模式下: http://host:port/download/{filename}
- stdio 模式下: 返回 None
"""
# 优先使用环境变量
base = os.getenv("MCP_OUTPUT_BASE_URL")
if base:
filename = os.path.basename(abs_output_path)
return base.rstrip("/") + "/" + filename
# 如果是 http 模式,自动构建下载 URL
if _server_config["transport"] == "http":
host = _server_config["host"]
port = _server_config["port"]
filename = os.path.basename(abs_output_path)
# 如果 host 是 0.0.0.0,尝试使用更具体的地址
if host == "0.0.0.0":
# 优先使用环境变量指定的公网地址
public_host = os.getenv("MCP_PUBLIC_HOST")
if public_host:
host = public_host
else:
# 默认使用 localhost
host = "localhost"
return f"http://{host}:{port}/download/{filename}"
return None
def _get_upload_dir() -> str:
"""
获取文件上传目录。
优先使用环境变量 MCP_UPLOAD_DIR否则使用当前目录下的 uploads 文件夹。
"""
upload_dir = os.getenv("MCP_UPLOAD_DIR", "./uploads")
os.makedirs(upload_dir, exist_ok=True)
return os.path.abspath(upload_dir)
@mcp.tool()
async def list_docx_images(docx_url: str) -> List[Dict[str, Any]]:
"""
列出指定 DOCX 文件中的所有图片信息。
参数:
- docx_url: 文件的HTTP/HTTPS URL。
返回:
- 图片信息列表,每一项包含:
- index: 图片在文档中的顺序(从 1 开始)
- media_file: DOCX 内部的资源路径
- ext: 图片扩展名
- docpr_name: Word 内部的图片名称
- width_cm / height_cm: 近似尺寸(厘米),可能为 None
"""
imgs = get_images_info(_download_to_temp(docx_url, suffix=".docx"))
return imgs
@mcp.tool()
async def edit_docx(input_docx_path: str, replacements: Optional[List[Dict[str, str]]] = None, image_replacements: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]:
"""
使用原始 mcp_docx 逻辑对 DOCX 文件进行编辑。
支持:
- 纯文本替换
- 通过 <span color=\"FF0000\">关键字</span> 语法设置关键字颜色
- 替换指定序号的图片
参数:
- input_docx_path: 输入 DOCX 文件路径
- replacements: 文本替换规则列表,例如:
[
{\"old\": \"旧标题\", \"new\": \"<span color='#FF0000'>新标题</span>\"},
{\"old\": \"原文\", \"new\": \"新文\"}
]
- image_replacements: 图片替换规则列表,例如:
[
{\"index\": 1, \"file\": \"new_chart.png\"},
{\"index\": 2, \"file\": \"new_photo.jpg\"}
]
其中 file 字段同样可以是本地路径或 HTTP/HTTPS URL。
返回:
- {
\"output_path\": 生成的 DOCX 绝对路径,
\"output_url\": 如果配置了 MCP_OUTPUT_BASE_URL则为可访问该文件的 URL否则为 null
}
"""
tmp_input: Optional[str] = None
tmp_images: List[str] = []
print(f"edit_docx: input_docx_path: {input_docx_path}, replacements: {replacements}, image_replacements: {image_replacements}")
try:
local_input = input_docx_path
if _is_url(input_docx_path):
parsed = urllib.parse.urlparse(input_docx_path)
ext = os.path.splitext(parsed.path)[1] or ".docx"
tmp_input = _download_to_temp(input_docx_path, suffix=ext)
local_input = tmp_input
if not os.path.exists(local_input):
raise FileNotFoundError(f"输入 DOCX 文件不存在: {input_docx_path}")
if replacements is None:
replacements = []
if image_replacements is None:
image_replacements = []
# 解析文本替换与颜色关键字(复用 CLI 逻辑)
rep_pairs = []
color_keywords = []
for item in replacements:
old = item.get("old")
new_raw = item.get("new")
if not old:
continue
if new_raw is None:
new_raw = ""
new_plain, spans = _parse_span_replacement(new_raw)
rep_pairs.append((old, new_plain))
color_keywords.extend(spans)
# 处理图片替换参数(支持本地路径或 URL
img_pairs = []
for item in image_replacements:
try:
idx = int(item.get("index"))
except (TypeError, ValueError):
continue
path = item.get("file")
if not path:
continue
local_img = path
if _is_url(path):
parsed = urllib.parse.urlparse(path)
ext = os.path.splitext(parsed.path)[1] or ""
suffix = ext if ext else ".img"
tmp_img = _download_to_temp(path, suffix=suffix)
tmp_images.append(tmp_img)
local_img = tmp_img
if not os.path.exists(local_img):
raise FileNotFoundError(f"图片文件不存在: {path}")
img_pairs.append((idx, local_img))
# 复用原始处理函数
process(
input_docx=local_input,
output_docx=local_input,
replacements=rep_pairs,
image_replacements=img_pairs,
color_keywords=color_keywords,
)
abs_out = os.path.abspath(local_input)
return {
"output_path": abs_out,
"output_url": _build_output_url(abs_out),
}
finally:
if tmp_input and os.path.exists(tmp_input):
try:
os.remove(tmp_input)
except OSError:
pass
for p in tmp_images:
if os.path.exists(p):
try:
os.remove(p)
except OSError:
pass
# HTTP 远程模式:添加文件上传下载路由
from starlette.responses import FileResponse, JSONResponse
from starlette.requests import Request
@mcp.custom_route("/upload", methods=["POST"])
async def upload_handler(request: Request):
"""处理文件上传"""
try:
form = await request.form()
file = form.get("file")
if not file:
return JSONResponse({
"success": False,
"message": "未提供文件"
}, status_code=400)
upload_dir = _get_upload_dir()
filename = file.filename
# 安全检查:防止路径遍历攻击
filename = os.path.basename(filename)
file_path = os.path.join(upload_dir, filename)
# 如果文件已存在,添加序号
base, ext = os.path.splitext(filename)
counter = 1
while os.path.exists(file_path):
filename = f"{base}_{counter}{ext}"
file_path = os.path.join(upload_dir, filename)
counter += 1
# 保存文件
content = await file.read()
with open(file_path, "wb") as f:
f.write(content)
return JSONResponse({
"success": True,
"filename": filename,
"file_path": file_path,
"file_url": _build_output_url(file_path),
"size": len(content),
"message": f"文件上传成功: {filename}"
})
except Exception as e:
return JSONResponse({
"success": False,
"message": f"文件上传失败: {str(e)}"
}, status_code=500)
@mcp.custom_route("/download/{filename}", methods=["GET"])
async def download_handler(request: Request):
"""处理文件下载"""
try:
filename = request.path_params.get("filename")
upload_dir = _get_upload_dir()
# 安全检查:防止路径遍历攻击
filename = os.path.basename(filename)
file_path = os.path.join(upload_dir, filename)
if not os.path.exists(file_path):
return JSONResponse({
"success": False,
"message": f"文件不存在: {filename}"
}, status_code=404)
if not os.path.isfile(file_path):
return JSONResponse({
"success": False,
"message": f"不是文件: {filename}"
}, status_code=400)
return FileResponse(
file_path,
filename=filename,
media_type="application/octet-stream"
)
except Exception as e:
return JSONResponse({
"success": False,
"message": f"文件下载失败: {str(e)}"
}, status_code=500)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="DOCX MCP 服务器")
parser.add_argument(
"--transport",
choices=["stdio", "http"],
default="stdio",
help="传输方式stdio本地或 http远程 HTTP /streamable-http",
)
parser.add_argument(
"--host",
default="0.0.0.0",
help="HTTP 模式监听地址(默认 0.0.0.0",
)
parser.add_argument(
"--port",
type=int,
default=8080,
help="HTTP 模式监听端口(默认 8080",
)
args = parser.parse_args()
if args.transport == "http":
# 保存服务器配置到全局变量
_server_config["host"] = args.host
_server_config["port"] = args.port
_server_config["transport"] = "http"
# 启动 MCP 服务器(会自动集成到 uvicorn
mcp.settings.host = args.host
mcp.settings.port = args.port
# 将自定义路由注入到 MCP 服务器
print(f"🚀 MCP HTTP 服务器启动中 → http://{args.host}:{args.port}/mcp")
# 注意FastMCP 使用 Starlette我们需要扩展其路由
mcp.run(transport="streamable-http")
else:
# 本地 stdio 模式
_server_config["transport"] = "stdio"
print("🚀 MCP stdio 模式启动中(本地使用)")
mcp.run(transport="stdio")