Files
mcp/mcp_docx_server.py
2026-03-20 20:12:17 +08:00

649 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
基于 mcp_docx.py 封装的 MCP 服务器。
暴露两个主要 MCP 工具:
- list_docx_images列出 DOCX 中的图片信息
- edit_docx: 进行文本替换 / 关键字上色 / 图片替换
额外提供 HTTP 文件接口(仅在 http 模式下可用):
- POST /upload: 上传文件到服务器
- GET /files/{filename}: 下载服务器上的文件
当前推荐的传输方式:
- stdio本地调试
- streamable-http远程 HTTP路径固定为 /mcp推荐
用法:
# 本地 stdio 模式(默认)
python mcp_docx_server.py --transport stdio
# HTTP 远程模式(推荐,默认 0.0.0.0:8080对外暴露 /mcp
python mcp_docx_server.py --transport http
python mcp_docx_server.py --transport http --host 0.0.0.0 --port 8080
# 客户端连接地址http 模式):
# MCP 端点: http://<host>:<port>/mcp
注意:底层仍然完全复用 mcp_docx.py 中的逻辑,只是通过 MCP SDK 对外提供。
"""
import argparse
import os
import tempfile
import urllib.parse
from datetime import datetime, date, timedelta
from typing import Any, Dict, List, Optional
import uuid
import requests
from lxml import etree
from mcp.server.fastmcp import FastMCP
from mcp.server.transport_security import TransportSecuritySettings
from mcp_docx import (
W,
get_images_info,
process,
_parse_span_replacement,
paragraph_replace,
unpack,
pack,
)
_disable_dns_rebinding = os.getenv("MCP_DISABLE_HOST_CHECK") == "1"
if _disable_dns_rebinding:
# 参考 python-sdk 官方文档:关闭 DNS rebinding 防护(适合本地或已由外层网关做安全控制的环境)
# https://github.com/modelcontextprotocol/python-sdk/issues/1798
transport_security = TransportSecuritySettings(
enable_dns_rebinding_protection=False,
)
else:
# 默认:开启 DNS rebinding 防护,但允许本机访问
# 如需通过网关 / 域名访问,可在这里追加 allowed_hosts / allowed_origins
transport_security = TransportSecuritySettings(
enable_dns_rebinding_protection=True,
allowed_hosts=["localhost:*", "127.0.0.1:*", "192.168.10.101:*"],
allowed_origins=["http://localhost:*", "http://127.0.0.1:*","http://192.168.10.101:*"],
)
mcp = FastMCP(
"docx-editor",
transport_security=transport_security,
)
# 全局变量:存储服务器配置
_server_config = {
"host": None,
"port": None,
"transport": None,
}
def _normalize_report_type(report_type: Optional[str]) -> Optional[str]:
if not report_type:
return None
t = str(report_type).strip().lower()
mapping = {
"日报": "daily",
"日報": "daily",
"daily": "daily",
"d": "daily",
"周报": "weekly",
"週報": "weekly",
"weekly": "weekly",
"w": "weekly",
"月报": "monthly",
"月報": "monthly",
"monthly": "monthly",
"m": "monthly",
}
return mapping.get(report_type, mapping.get(t))
def _build_issue_text(norm_type: Optional[str], now: datetime) -> str:
"""根据报告类型和生成时间计算“期数 + 日期”字符串。"""
d = now.date()
date_str = f"{d.year}{d.month}{d.day}"
if norm_type == "daily":
# 日报:只有日期,没有期数
return date_str
if norm_type == "weekly":
# 周报:根据“当周周一所在月份”的周序号来计算期数
monday = d - timedelta(days=d.weekday())
year = monday.year
month = monday.month
first_day = date(year, month, 1)
offset = (0 - first_day.weekday()) % 7 # 距离第一个周一的天数
first_monday = first_day + timedelta(days=offset)
issue_no = ((monday - first_monday).days // 7) + 1
if issue_no < 1:
issue_no = 1
return f"{date_str}(第{issue_no}期)"
# 默认:月报逻辑,期数固定为第一期
return f"{date_str}第1期"
def _apply_report_date_logic_to_docx(
docx_path: str,
report_type: Optional[str],
report_title_time: Optional[str],
) -> None:
"""
只在“目录”之前的内容中,按照规则替换日期相关文本:
- 匹配第一个形如 YYYY年M月 的片段 → 替换为 report_title_time
- 匹配第一个形如 YYYY年M月D日第X期 的片段 →
按报告类型 + 当前生成时间计算期数和日期,并进行替换。
"""
norm_type = _normalize_report_type(report_type)
if not norm_type and not report_title_time:
return
# 没有任何需要替换的目标,直接返回
if not os.path.exists(docx_path):
return
with tempfile.TemporaryDirectory() as tmpdir:
unpack(docx_path, tmpdir)
doc_xml_path = os.path.join(tmpdir, "word", "document.xml")
if not os.path.exists(doc_xml_path):
return
tree = etree.parse(doc_xml_path)
root = tree.getroot()
title_replaced = False
issue_replaced = False
now = datetime.now()
# 正则模式
import re
pattern_title = re.compile(r"(\d{4})年(\d{1,2})月")
pattern_issue = re.compile(r"(\d{4})年(\d{1,2})月(\d{1,2})日(第(\d+)期)")
for p in root.iter(f"{{{W}}}p"):
# 聚合段落文本
t_nodes = list(p.iter(f"{{{W}}}t"))
full = "".join(t.text or "" for t in t_nodes)
# 遇到“目录”后就停止处理后面的内容
if "目录" in full:
break
para_repls = []
if report_title_time and not title_replaced:
m = pattern_title.search(full)
if m:
old = m.group(0)
new = report_title_time
para_repls.append((old, new))
title_replaced = True
if norm_type and not issue_replaced:
m2 = pattern_issue.search(full)
if m2:
old2 = m2.group(0)
new2 = _build_issue_text(norm_type, now)
para_repls.append((old2, new2))
issue_replaced = True
if para_repls:
paragraph_replace(p, para_repls)
tree.write(doc_xml_path, xml_declaration=True, encoding="UTF-8", standalone=True)
# 重新打包覆盖原始 DOCX
pack(tmpdir, docx_path, docx_path)
def _is_url(path: str) -> bool:
"""简单判断一个字符串是否为 HTTP/HTTPS URL。"""
return path.startswith("http://") or path.startswith("https://")
def _download_to_temp(url: str, suffix: str = ".tmp") -> str:
"""
将远程 URL 下载到临时文件,返回本地临时路径。
调用方负责在使用完毕后删除该文件。
"""
resp = requests.get(url, stream=True, timeout=30)
resp.raise_for_status()
fd, tmp_path = tempfile.mkstemp(suffix=suffix)
try:
with os.fdopen(fd, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
except Exception:
# 出错时清理临时文件
try:
os.remove(tmp_path)
except OSError:
pass
raise
return tmp_path
def _build_output_url(abs_output_path: str) -> Optional[str]:
"""
构造输出文件的下载 URL。
优先使用环境变量 MCP_OUTPUT_BASE_URL否则根据服务器配置自动构建。
约定:
- 如果设置了 MCP_OUTPUT_BASE_URL: 使用该 URL 作为基础
- 否则在 http 模式下: http://host:port/download/{filename}
- stdio 模式下: 返回 None
"""
# 优先使用环境变量
base = os.getenv("MCP_OUTPUT_BASE_URL")
if base:
filename = os.path.basename(abs_output_path)
return base.rstrip("/") + "/" + filename
# 如果是 http 模式,自动构建下载 URL
if _server_config["transport"] == "http":
host = _server_config["host"]
port = _server_config["port"]
filename = os.path.basename(abs_output_path)
# 如果 host 是 0.0.0.0,尝试使用更具体的地址
if host == "0.0.0.0":
# 优先使用环境变量指定的公网地址
public_host = os.getenv("MCP_PUBLIC_HOST")
if public_host:
host = public_host
else:
# 默认使用 localhost
host = "localhost"
return f"http://{host}:{port}/download/{filename}"
return None
def _get_upload_dir() -> str:
"""
获取文件上传目录。
优先使用环境变量 MCP_UPLOAD_DIR否则使用当前目录下的 uploads 文件夹。
"""
upload_dir = os.getenv("MCP_UPLOAD_DIR", "./uploads")
os.makedirs(upload_dir, exist_ok=True)
return os.path.abspath(upload_dir)
def _get_tmp_upload_dir() -> str:
"""
获取临时上传目录。
优先使用环境变量 MCP_TMP_UPLOAD_DIR否则使用当前目录下的 tmp 文件夹。
"""
tmp_dir = os.getenv("MCP_TMP_UPLOAD_DIR", "./tmp")
os.makedirs(tmp_dir, exist_ok=True)
return os.path.abspath(tmp_dir)
@mcp.tool()
async def list_docx_images(docx_url: str) -> List[Dict[str, Any]]:
"""
列出指定 DOCX 文件中的所有图片信息。
参数:
- docx_url: 文件的HTTP/HTTPS URL。
返回:
- 图片信息列表,每一项包含:
- index: 图片在文档中的顺序(从 1 开始)
- media_file: DOCX 内部的资源路径
- ext: 图片扩展名
- docpr_name: Word 内部的图片名称
- width_cm / height_cm: 近似尺寸(厘米),可能为 None
"""
imgs = get_images_info(_download_to_temp(docx_url, suffix=".docx"))
return imgs
@mcp.custom_route("/edit_docx", methods=["POST"])
async def edit_docx(
input_docx_path: str,
replacements: List[Dict[str, str]] = None,
image_replacements: Optional[List[Dict[str, Any]]] = None,
report_type: Optional[str] = None,
report_title_time: Optional[str] = None,
) -> Dict[str, Any]:
"""
对 DOCX 文件进行编辑。
支持:
- 纯文本替换
- 通过 <span color="red">关键字</span> 语法设置关键字颜色
- 替换指定序号的图片
- 报告日期与期数自动替换(仅在“目录”之前生效)
参数:
- input_docx_path: 输入 DOCX 文件名称
- replacements: 文本替换规则列表,例如:
[
{"old": "计划作业总数共有10项。", "new": "计划作业总数共有<span color='red'>XX</span>项。"},
{"old": "文档原文本,必须是完整的一句话或者段落", "new": "要替换的文本"}
]
- image_replacements: 图片替换规则
- report_type: 报告类型,可选值:日报 / 周报 / 月报(或对应的英文 daily / weekly / monthly
- report_title_time: 报告标题中要显示的时间字符串用来替换“YYYY年M月”这一段仅在第一次匹配时生效
返回:
- {
"output_path": 生成的 DOCX 绝对路径,
"output_url": 如果配置了 MCP_OUTPUT_BASE_URL则为可访问该文件的 URL否则为 null
}
"""
tmp_input: Optional[str] = None
tmp_images: List[str] = []
print(f"edit_docx: input_docx_path: {input_docx_path}, replacements: {replacements}, image_replacements: {image_replacements}")
try:
upload_dir = _get_upload_dir() # 输出目录:/uploads
tmp_upload_dir = _get_tmp_upload_dir() # 上传临时目录:/tmp
# 解析输入路径:支持 URL、绝对路径、仅文件名三种形式
local_input = input_docx_path
if _is_url(input_docx_path):
parsed = urllib.parse.urlparse(input_docx_path)
ext = os.path.splitext(parsed.path)[1] or ".docx"
tmp_input = _download_to_temp(input_docx_path, suffix=ext)
local_input = tmp_input
elif not os.path.isabs(local_input):
# 相对路径:优先在 tmp其次在 uploads 中查找
cand_tmp = os.path.join(tmp_upload_dir, input_docx_path)
cand_upload = os.path.join(upload_dir, input_docx_path)
if os.path.exists(cand_tmp):
local_input = cand_tmp
else:
local_input = cand_upload
if not os.path.exists(local_input):
raise FileNotFoundError(f"输入 DOCX 文件不存在: {input_docx_path}")
if replacements is None:
replacements = []
if image_replacements is None:
image_replacements = []
# 解析文本替换与颜色关键字(复用 CLI 逻辑)
rep_pairs = []
color_keywords = []
for item in replacements:
old = item.get("old")
new_raw = item.get("new")
if not old:
continue
if new_raw is None:
new_raw = ""
new_plain, spans = _parse_span_replacement(new_raw)
rep_pairs.append((old, new_plain))
color_keywords.extend(spans)
# 处理图片替换参数(支持本地路径或 URL
img_pairs = []
for item in image_replacements:
try:
idx = int(item.get("index"))
except (TypeError, ValueError):
continue
path = item.get("file")
if not path:
continue
local_img = path
if _is_url(path):
parsed = urllib.parse.urlparse(path)
ext = os.path.splitext(parsed.path)[1] or ""
suffix = ext if ext else ".img"
tmp_img = _download_to_temp(path, suffix=suffix)
tmp_images.append(tmp_img)
local_img = tmp_img
if not os.path.exists(local_img):
raise FileNotFoundError(f"图片文件不存在: {path}")
img_pairs.append((idx, local_img))
# 复用原始处理函数:
# 输出文件统一写入 /uploads 目录,文件名带时间戳和随机后缀避免并发冲突
base_name = os.path.basename(local_input)
name_root, _ = os.path.splitext(base_name)
ts = datetime.now().strftime('%Y%m%d%H%M%S')
rand = uuid.uuid4().hex[:6]
output_filename = f"{name_root}_output_{ts}_{rand}.docx"
output_docx = os.path.join(upload_dir, output_filename)
process(
input_docx=local_input,
output_docx=output_docx,
replacements=rep_pairs,
image_replacements=img_pairs,
color_keywords=color_keywords,
)
# 追加:根据报告类型与标题时间,在“目录”之前自动处理日期和期数
if report_type or report_title_time:
try:
_apply_report_date_logic_to_docx(
output_docx,
report_type=report_type,
report_title_time=report_title_time,
)
except Exception as e:
# 避免因为日期处理失败而导致整个接口报错,把错误写到日志即可
print(f"apply report date logic failed: {e}")
abs_out = os.path.abspath(output_docx)
# 删除上传的临时文件:只删除位于 tmp 目录中的输入文件
try:
tmp_root = _get_tmp_upload_dir()
if os.path.exists(local_input):
abs_input = os.path.abspath(local_input)
if os.path.commonpath([abs_input, tmp_root]) == tmp_root:
os.remove(local_input)
except Exception:
# 不因清理失败影响主流程
pass
return {
"output_path": output_docx,
"output_url": _build_output_url(output_docx),
}
finally:
if tmp_input and os.path.exists(tmp_input):
try:
os.remove(tmp_input)
except OSError:
pass
for p in tmp_images:
if os.path.exists(p):
try:
os.remove(p)
except OSError:
pass
# HTTP 远程模式:添加文件上传下载路由
from starlette.responses import FileResponse, JSONResponse
from starlette.requests import Request
def _get_log_path() -> str:
"""
获取日志文件路径。
优先使用环境变量 MCP_LOG_FILE完整路径否则使用当前目录下的 logs/mcp.log。
"""
log_file = os.getenv("MCP_LOG_FILE", "./logs/mcp.log")
log_path = os.path.abspath(log_file)
os.makedirs(os.path.dirname(log_path), exist_ok=True)
return log_path
@mcp.custom_route("/log", methods=["POST"])
async def append_log(request: Request):
"""
将一段字符串追加写入日志文件,每行带时间戳。
参数:
- message: 要写入的字符串内容。
返回:
- JSON 格式:
{"success": True/False, "log_path": 日志文件路径, "message": 说明}
"""
try:
# data = await request.json()
data = await request.body()
if not data:
return JSONResponse(
{
"success": False,
"message": "未提供消息内容",
},
status_code=400,
)
log_path = _get_log_path()
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"[{ts}] {data.decode('utf-8')}\n"
with open(log_path, "a", encoding="utf-8") as f:
f.write(line)
return JSONResponse(
{
"success": True,
"log_path": log_path,
"message": "已写入日志",
}
)
except Exception as e:
return JSONResponse(
{
"success": False,
"log_path": _get_log_path(),
"message": f"写入日志失败: {str(e)}",
},
status_code=500,
)
@mcp.custom_route("/upload", methods=["POST"])
async def upload_handler(request: Request):
"""处理文件上传"""
try:
form = await request.form()
file = form.get("file")
if not file:
return JSONResponse({
"success": False,
"message": "未提供文件"
}, status_code=400)
tmp_dir = _get_tmp_upload_dir()
orig_filename = file.filename or "uploaded.docx"
# 安全检查:防止路径遍历攻击,保留原始文件名
filename = os.path.basename(orig_filename)
file_path = os.path.join(tmp_dir, filename)
# 保存文件到临时目录(如已存在则覆盖)
content = await file.read()
with open(file_path, "wb") as f:
f.write(content)
return JSONResponse({
"success": True,
"filename": filename, # 保留原始文件名,供 edit_docx 使用
"file_path": file_path, # 绝对路径(可选)
"file_url": None, # 临时文件不提供下载 URL
"size": len(content),
"message": f"文件上传成功: {filename}"
})
except Exception as e:
return JSONResponse({
"success": False,
"message": f"文件上传失败: {str(e)}"
}, status_code=500)
@mcp.custom_route("/download/{filename}", methods=["GET"])
async def download_handler(request: Request):
"""处理文件下载"""
try:
filename = request.path_params.get("filename")
upload_dir = _get_upload_dir()
# 安全检查:防止路径遍历攻击
filename = os.path.basename(filename)
file_path = os.path.join(upload_dir, filename)
if not os.path.exists(file_path):
return JSONResponse({
"success": False,
"message": f"文件不存在: {filename}"
}, status_code=404)
if not os.path.isfile(file_path):
return JSONResponse({
"success": False,
"message": f"不是文件: {filename}"
}, status_code=400)
return FileResponse(
file_path,
filename=filename,
media_type="application/octet-stream"
)
except Exception as e:
return JSONResponse({
"success": False,
"message": f"文件下载失败: {str(e)}"
}, status_code=500)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="DOCX MCP 服务器")
parser.add_argument(
"--transport",
choices=["stdio", "http"],
default="stdio",
help="传输方式stdio本地或 http远程 HTTP /streamable-http",
)
parser.add_argument(
"--host",
default="0.0.0.0",
help="HTTP 模式监听地址(默认 0.0.0.0",
)
parser.add_argument(
"--port",
type=int,
default=8080,
help="HTTP 模式监听端口(默认 8080",
)
args = parser.parse_args()
if args.transport == "http":
# 保存服务器配置到全局变量
_server_config["host"] = args.host
_server_config["port"] = args.port
_server_config["transport"] = "http"
# 启动 MCP 服务器(会自动集成到 uvicorn
mcp.settings.host = args.host
mcp.settings.port = args.port
# 将自定义路由注入到 MCP 服务器
print(f"🚀 MCP HTTP 服务器启动中 → http://{args.host}:{args.port}/mcp")
# 注意FastMCP 使用 Starlette我们需要扩展其路由
mcp.run(transport="streamable-http")
else:
# 本地 stdio 模式
_server_config["transport"] = "stdio"
print("🚀 MCP stdio 模式启动中(本地使用)")
mcp.run(transport="stdio")