This commit is contained in:
2026-03-20 19:19:26 +08:00
parent f009258769
commit 248f7a2637
4 changed files with 457 additions and 53 deletions

File diff suppressed because one or more lines are too long

View File

@@ -2,16 +2,14 @@ version: "3.9"
services:
mcp-docx-server:
build:
context: .
dockerfile: Dockerfile
container_name: mcp-docx-server
image: mcp-mcp-9nfjir-mcp-docx-server:latest
working_dir: /app
# SSE MCP 服务端口
ports:
- "8080:8080"
environment:
MCP_OUTPUT_BASE_URL: http://149.88.66.186:8080/download
MCP_OUTPUT_BASE_URL: http://192.168.10.114:8080/download
# 可选:挂载数据目录供 DOCX 读写
volumes:
- /root/uploads:/app/uploads

View File

@@ -29,6 +29,7 @@ import tempfile
import zipfile
from lxml import etree
from PIL import Image
import re
W = 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'
WD = 'http://schemas.openxmlformats.org/drawingml/2006/wordprocessingDrawing'
@@ -260,16 +261,39 @@ def set_color_on_rpr(rpr_el, hex_color):
c = etree.SubElement(rpr_el, f'{{{W}}}color')
c.set(f'{{{W}}}val', hex_color.lstrip('#'))
def apply_color_to_keyword(doc_el, keyword, hex_color):
def apply_color_to_keyword(doc_el, keyword, hex_color, context_text=None):
"""
只给匹配到的关键字本身着色,而不是整个 run。
做法:在有关键字的 run 上,把文本拆成多段 run
[前缀][关键字][后缀],只有“关键字”这个 run 设置颜色。
当 context_text 不为空时,只在“整段文本包含该 context_text 的段落”中进行上色,
避免同一个关键字在其他段落里被误伤(例如单独的数字 0
"""
# 如果提供了上下文,只在包含该上下文的段落内着色
allowed_paras = None
if context_text:
allowed_paras = set()
for p in doc_el.iter(f'{{{W}}}p'):
t_nodes = list(p.iter(f'{{{W}}}t'))
full = ''.join(t.text or '' for t in t_nodes)
if context_text in full:
allowed_paras.add(p)
def _find_ancestor_para(el):
cur = el
while cur is not None and cur.tag != f'{{{W}}}p':
cur = cur.getparent()
return cur
# 先 list 一下,避免在遍历时修改树结构导致问题
runs = list(doc_el.iter(f'{{{W}}}r'))
for run in runs:
if allowed_paras is not None:
para = _find_ancestor_para(run)
if para not in allowed_paras:
continue
t_nodes = list(run.findall(f'{{{W}}}t'))
if not t_nodes:
continue
@@ -322,6 +346,65 @@ def apply_color_to_keyword(doc_el, keyword, hex_color):
if seg_text:
parent.insert(insert_pos + offset, make_run(seg_text, colored))
def remove_rule_blocks(doc_el):
"""
删除文档中位于 <global_rule>...</global_rule>、<rule>...</rule> 和 <chart_rule>...</chart_rule> 之间的所有段落。
说明:
- 标签内容可能跨段落,这里按段落顺序遍历,记录是否处于 rule 块内。
- 一旦进入某个块(遇到起始标签),直到遇到对应的结束标签为止,整段段落都会被删除。
- 假设标签本身和其中内容都不需要出现在最终文档里。
"""
inside_global = False
inside_rule = False
inside_chart = False
paras_to_delete = []
# list(...) 防止在遍历时修改树结构
for p in list(doc_el.iter(f'{{{W}}}p')):
t_nodes = list(p.iter(f'{{{W}}}t'))
full = ''.join(t.text or '' for t in t_nodes)
if not full:
# 空段落如果在块内,也删掉
if inside_global or inside_rule or inside_chart:
paras_to_delete.append(p)
continue
# 当前是否在某个块内
if inside_global or inside_rule or inside_chart:
paras_to_delete.append(p)
# 检测 global_rule 块
if '<global_rule>' in full:
inside_global = True
if p not in paras_to_delete:
paras_to_delete.append(p)
if '</global_rule>' in full:
inside_global = False
# 检测 rule 块
if '<rule>' in full:
inside_rule = True
if p not in paras_to_delete:
paras_to_delete.append(p)
if '</rule>' in full:
inside_rule = False
# 检测 chart_rule 块
if '<chart_rule>' in full:
inside_chart = True
if p not in paras_to_delete:
paras_to_delete.append(p)
if '</chart_rule>' in full:
inside_chart = False
for p in paras_to_delete:
parent = p.getparent()
if parent is not None:
parent.remove(p)
def process(input_docx, output_docx, replacements, image_replacements,
color_keywords):
with tempfile.TemporaryDirectory() as tmpdir:
@@ -338,15 +421,24 @@ def process(input_docx, output_docx, replacements, image_replacements,
tree = etree.parse(doc_xml_path)
root = tree.getroot()
# 先整体删除全局规则和普通规则块(支持标签跨段落)
remove_rule_blocks(root)
if replacements:
print(f"✏️ 替换 {len(replacements)} 条文本...")
for para in root.iter(f'{{{W}}}p'):
paragraph_replace(para, replacements)
# 根据 span 解析出的关键字上色
for keyword, color in color_keywords:
for item in color_keywords:
# 兼容旧格式: (keyword, color)
if len(item) == 2:
keyword, color = item
context_text = None
else:
keyword, color, context_text = item
print(f"🎨 关键词「{keyword}」→ #{color}")
apply_color_to_keyword(root, keyword, color)
apply_color_to_keyword(root, keyword, color, context_text)
tree.write(doc_xml_path, xml_declaration=True, encoding='UTF-8', standalone=True)
print(f"📦 打包 → {output_docx} ...")
@@ -359,27 +451,75 @@ def _parse_span_replacement(new_text):
解析 NEW 文本中的 span 标签,用于决定颜色。
约定格式(不区分大小写):
<span color="FF0000">待补充</span>
<span color="#FF0000">待补充</span>
<span color="red">待补充</span>
返回: (纯文本, [(keyword, hex_color), ...])
"""
import re
# 简单的命名颜色到 16 进制的映射,可按需扩展
named_colors = {
'red': 'FF0000',
'blue': '0000FF',
'green': '00FF00',
'yellow': 'FFFF00',
'black': '000000',
'white': 'FFFFFF',
'gray': '808080',
'grey': '808080',
}
def _normalize_color(raw_color: str) -> str:
"""
支持:
- FFFFFF / ffffff
- #FFFFFF / #ffffff
- red / blue 等命名颜色(见 named_colors
返回不带 # 的大写 16 进制字符串;如果无法识别命名颜色则原样返回(去掉 #)。
"""
c = (raw_color or '').strip()
if not c:
return ''
# 去掉前导 #
if c.startswith('#'):
c = c[1:]
# 纯 16 进制
if re.fullmatch(r'[0-9a-fA-F]{6}', c):
return c.upper()
# 命名颜色
mapped = named_colors.get(c.lower())
if mapped:
return mapped
# 兜底:返回去掉 # 的原值
return c.upper()
# color 属性允许:
# - 6 位 16 进制(可带 #
# - 命名颜色red / blue ...
span_pattern = re.compile(
r'<span\s+[^>]*?color=["\']?(#?[0-9a-fA-F]{6})["\']?[^>]*>(.*?)</span>',
r'<span\s+[^>]*?color=["\']?([^"\'\s>]+)["\']?[^>]*>(.*?)</span>',
re.IGNORECASE | re.DOTALL,
)
# 先得到去掉 span 标签后的纯文本(也是最终会写入 DOCX 的内容)
def _strip_repl(m):
return m.group(2)
plain_text = span_pattern.sub(_strip_repl, new_text)
# 再次遍历 span收集颜色关键字并把“整句纯文本”作为上下文挂在每个关键字上
color_keywords = []
def _repl(m):
hex_color = m.group(1).lstrip('#')
for m in span_pattern.finditer(new_text):
raw_color = m.group(1)
hex_color = _normalize_color(raw_color)
keyword = m.group(2)
color_keywords.append((keyword, hex_color))
return keyword
# 三元组: (关键字, 颜色, 该 NEW 对应的整句纯文本上下文)
color_keywords.append((keyword, hex_color, plain_text))
plain_text = span_pattern.sub(_repl, new_text)
return plain_text, color_keywords

View File

@@ -32,13 +32,24 @@ import argparse
import os
import tempfile
import urllib.parse
from datetime import datetime, date, timedelta
from typing import Any, Dict, List, Optional
import uuid
import requests
from lxml import etree
from mcp.server.fastmcp import FastMCP
from mcp.server.transport_security import TransportSecuritySettings
from mcp_docx import get_images_info, process, _parse_span_replacement
from mcp_docx import (
W,
get_images_info,
process,
_parse_span_replacement,
paragraph_replace,
unpack,
pack,
)
_disable_dns_rebinding = os.getenv("MCP_DISABLE_HOST_CHECK") == "1"
@@ -53,8 +64,8 @@ else:
# 如需通过网关 / 域名访问,可在这里追加 allowed_hosts / allowed_origins
transport_security = TransportSecuritySettings(
enable_dns_rebinding_protection=True,
allowed_hosts=["localhost:*", "127.0.0.1:*","149.88.66.186:*"],
allowed_origins=["http://localhost:*", "http://127.0.0.1:*","http://149.88.66.186:*"],
allowed_hosts=["localhost:*", "127.0.0.1:*", "192.168.10.101:*"],
allowed_origins=["http://localhost:*", "http://127.0.0.1:*","http://192.168.10.101:*"],
)
@@ -71,6 +82,127 @@ _server_config = {
}
def _normalize_report_type(report_type: Optional[str]) -> Optional[str]:
if not report_type:
return None
t = str(report_type).strip().lower()
mapping = {
"日报": "daily",
"日報": "daily",
"daily": "daily",
"d": "daily",
"周报": "weekly",
"週報": "weekly",
"weekly": "weekly",
"w": "weekly",
"月报": "monthly",
"月報": "monthly",
"monthly": "monthly",
"m": "monthly",
}
return mapping.get(report_type, mapping.get(t))
def _build_issue_text(norm_type: Optional[str], now: datetime) -> str:
"""根据报告类型和生成时间计算“期数 + 日期”字符串。"""
d = now.date()
date_str = f"{d.year}{d.month}{d.day}"
if norm_type == "daily":
# 日报:只有日期,没有期数
return date_str
if norm_type == "weekly":
# 周报:根据“当周周一所在月份”的周序号来计算期数
monday = d - timedelta(days=d.weekday())
year = monday.year
month = monday.month
first_day = date(year, month, 1)
offset = (0 - first_day.weekday()) % 7 # 距离第一个周一的天数
first_monday = first_day + timedelta(days=offset)
issue_no = ((monday - first_monday).days // 7) + 1
if issue_no < 1:
issue_no = 1
return f"{date_str}(第{issue_no}期)"
# 默认:月报逻辑,期数固定为第一期
return f"{date_str}第1期"
def _apply_report_date_logic_to_docx(
docx_path: str,
report_type: Optional[str],
report_title_time: Optional[str],
) -> None:
"""
只在“目录”之前的内容中,按照规则替换日期相关文本:
- 匹配第一个形如 YYYY年M月 的片段 → 替换为 report_title_time
- 匹配第一个形如 YYYY年M月D日第X期 的片段 →
按报告类型 + 当前生成时间计算期数和日期,并进行替换。
"""
norm_type = _normalize_report_type(report_type)
if not norm_type and not report_title_time:
return
# 没有任何需要替换的目标,直接返回
if not os.path.exists(docx_path):
return
with tempfile.TemporaryDirectory() as tmpdir:
unpack(docx_path, tmpdir)
doc_xml_path = os.path.join(tmpdir, "word", "document.xml")
if not os.path.exists(doc_xml_path):
return
tree = etree.parse(doc_xml_path)
root = tree.getroot()
title_replaced = False
issue_replaced = False
now = datetime.now()
# 正则模式
import re
pattern_title = re.compile(r"(\d{4})年(\d{1,2})月")
pattern_issue = re.compile(r"(\d{4})年(\d{1,2})月(\d{1,2})日(第(\d+)期)")
for p in root.iter(f"{{{W}}}p"):
# 聚合段落文本
t_nodes = list(p.iter(f"{{{W}}}t"))
full = "".join(t.text or "" for t in t_nodes)
# 遇到“目录”后就停止处理后面的内容
if "目录" in full:
break
para_repls = []
if report_title_time and not title_replaced:
m = pattern_title.search(full)
if m:
old = m.group(0)
new = report_title_time
para_repls.append((old, new))
title_replaced = True
if norm_type and not issue_replaced:
m2 = pattern_issue.search(full)
if m2:
old2 = m2.group(0)
new2 = _build_issue_text(norm_type, now)
para_repls.append((old2, new2))
issue_replaced = True
if para_repls:
paragraph_replace(p, para_repls)
tree.write(doc_xml_path, xml_declaration=True, encoding="UTF-8", standalone=True)
# 重新打包覆盖原始 DOCX
pack(tmpdir, docx_path, docx_path)
def _is_url(path: str) -> bool:
"""简单判断一个字符串是否为 HTTP/HTTPS URL。"""
return path.startswith("http://") or path.startswith("https://")
@@ -151,6 +283,17 @@ def _get_upload_dir() -> str:
return os.path.abspath(upload_dir)
def _get_tmp_upload_dir() -> str:
"""
获取临时上传目录。
优先使用环境变量 MCP_TMP_UPLOAD_DIR否则使用当前目录下的 tmp 文件夹。
"""
tmp_dir = os.getenv("MCP_TMP_UPLOAD_DIR", "./tmp")
os.makedirs(tmp_dir, exist_ok=True)
return os.path.abspath(tmp_dir)
@mcp.tool()
async def list_docx_images(docx_url: str) -> List[Dict[str, Any]]:
"""
@@ -171,46 +314,61 @@ async def list_docx_images(docx_url: str) -> List[Dict[str, Any]]:
return imgs
@mcp.tool()
async def edit_docx(input_docx_path: str, replacements: Optional[List[Dict[str, str]]] = None, image_replacements: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]:
async def edit_docx(
input_docx_path: str,
replacements: List[Dict[str, str]] = None,
image_replacements: Optional[List[Dict[str, Any]]] = None,
report_type: Optional[str] = None,
report_title_time: Optional[str] = None,
) -> Dict[str, Any]:
"""
使用原始 mcp_docx 逻辑对 DOCX 文件进行编辑。
对 DOCX 文件进行编辑。
支持:
- 纯文本替换
- 通过 <span color=\"FF0000\">关键字</span> 语法设置关键字颜色
- 通过 <span color="red">关键字</span> 语法设置关键字颜色
- 替换指定序号的图片
- 报告日期与期数自动替换(仅在“目录”之前生效)
参数:
- input_docx_path: 输入 DOCX 文件名称
- replacements: 文本替换规则列表,例如:
[
{\"old\": \"旧标题\", \"new\": \"<span color='#FF0000'>新标题</span>\"},
{\"old\": \"原文\", \"new\": \"新文\"}
{"old": "计划作业总数共有10项。", "new": "计划作业总数共有<span color='red'>XX</span>项。"},
{"old": "文档原文本,必须是完整的一句话或者段落", "new": "要替换的文本"}
]
- image_replacements: 图片替换规则列表,例如:
[
{\"index\": 1, \"file\": \"new_chart.png\"},
{\"index\": 2, \"file\": \"new_photo.jpg\"}
]
其中 file 字段同样可以是本地路径或 HTTP/HTTPS URL。
- image_replacements: 图片替换规则
- report_type: 报告类型,可选值:日报 / 周报 / 月报(或对应的英文 daily / weekly / monthly
- report_title_time: 报告标题中要显示的时间字符串用来替换“YYYY年M月”这一段仅在第一次匹配时生效
返回:
- {
\"output_path\": 生成的 DOCX 绝对路径,
\"output_url\": 如果配置了 MCP_OUTPUT_BASE_URL则为可访问该文件的 URL否则为 null
"output_path": 生成的 DOCX 绝对路径,
"output_url": 如果配置了 MCP_OUTPUT_BASE_URL则为可访问该文件的 URL否则为 null
}
"""
tmp_input: Optional[str] = None
tmp_images: List[str] = []
print(f"edit_docx: input_docx_path: {input_docx_path}, replacements: {replacements}, image_replacements: {image_replacements}")
try:
upload_dir = _get_upload_dir()
local_input = os.path.join(upload_dir, input_docx_path)
upload_dir = _get_upload_dir() # 输出目录:/uploads
tmp_upload_dir = _get_tmp_upload_dir() # 上传临时目录:/tmp
# 解析输入路径:支持 URL、绝对路径、仅文件名三种形式
local_input = input_docx_path
if _is_url(input_docx_path):
parsed = urllib.parse.urlparse(input_docx_path)
ext = os.path.splitext(parsed.path)[1] or ".docx"
tmp_input = _download_to_temp(input_docx_path, suffix=ext)
local_input = tmp_input
elif not os.path.isabs(local_input):
# 相对路径:优先在 tmp其次在 uploads 中查找
cand_tmp = os.path.join(tmp_upload_dir, input_docx_path)
cand_upload = os.path.join(upload_dir, input_docx_path)
if os.path.exists(cand_tmp):
local_input = cand_tmp
else:
local_input = cand_upload
if not os.path.exists(local_input):
raise FileNotFoundError(f"输入 DOCX 文件不存在: {input_docx_path}")
@@ -260,8 +418,14 @@ async def edit_docx(input_docx_path: str, replacements: Optional[List[Dict[str,
img_pairs.append((idx, local_img))
# 复用原始处理函数
output_docx = local_input.replace(".docx", "_output.docx")
# 复用原始处理函数
# 输出文件统一写入 /uploads 目录,文件名带时间戳和随机后缀避免并发冲突
base_name = os.path.basename(local_input)
name_root, _ = os.path.splitext(base_name)
ts = datetime.now().strftime('%Y%m%d%H%M%S')
rand = uuid.uuid4().hex[:6]
output_filename = f"{name_root}_output_{ts}_{rand}.docx"
output_docx = os.path.join(upload_dir, output_filename)
process(
input_docx=local_input,
output_docx=output_docx,
@@ -270,7 +434,30 @@ async def edit_docx(input_docx_path: str, replacements: Optional[List[Dict[str,
color_keywords=color_keywords,
)
# 追加:根据报告类型与标题时间,在“目录”之前自动处理日期和期数
if report_type or report_title_time:
try:
_apply_report_date_logic_to_docx(
output_docx,
report_type=report_type,
report_title_time=report_title_time,
)
except Exception as e:
# 避免因为日期处理失败而导致整个接口报错,把错误写到日志即可
print(f"apply report date logic failed: {e}")
abs_out = os.path.abspath(output_docx)
# 删除上传的临时文件:只删除位于 tmp 目录中的输入文件
try:
tmp_root = _get_tmp_upload_dir()
if os.path.exists(local_input):
abs_input = os.path.abspath(local_input)
if os.path.commonpath([abs_input, tmp_root]) == tmp_root:
os.remove(local_input)
except Exception:
# 不因清理失败影响主流程
pass
return {
"output_path": output_docx,
"output_url": _build_output_url(output_docx),
@@ -291,6 +478,61 @@ async def edit_docx(input_docx_path: str, replacements: Optional[List[Dict[str,
# HTTP 远程模式:添加文件上传下载路由
from starlette.responses import FileResponse, JSONResponse
from starlette.requests import Request
def _get_log_path() -> str:
"""
获取日志文件路径。
优先使用环境变量 MCP_LOG_FILE完整路径否则使用当前目录下的 logs/mcp.log。
"""
log_file = os.getenv("MCP_LOG_FILE", "./logs/mcp.log")
log_path = os.path.abspath(log_file)
os.makedirs(os.path.dirname(log_path), exist_ok=True)
return log_path
@mcp.custom_route("/log", methods=["POST"])
async def append_log(request: Request):
"""
将一段字符串追加写入日志文件,每行带时间戳。
参数:
- message: 要写入的字符串内容。
返回:
- JSON 格式:
{"success": True/False, "log_path": 日志文件路径, "message": 说明}
"""
try:
# data = await request.json()
data = await request.body()
if not data:
return JSONResponse(
{
"success": False,
"message": "未提供消息内容",
},
status_code=400,
)
log_path = _get_log_path()
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"[{ts}] {data.decode('utf-8')}\n"
with open(log_path, "a", encoding="utf-8") as f:
f.write(line)
return JSONResponse(
{
"success": True,
"log_path": log_path,
"message": "已写入日志",
}
)
except Exception as e:
return JSONResponse(
{
"success": False,
"log_path": _get_log_path(),
"message": f"写入日志失败: {str(e)}",
},
status_code=500,
)
@mcp.custom_route("/upload", methods=["POST"])
async def upload_handler(request: Request):
"""处理文件上传"""
@@ -304,31 +546,23 @@ async def upload_handler(request: Request):
"message": "未提供文件"
}, status_code=400)
upload_dir = _get_upload_dir()
filename = file.filename
tmp_dir = _get_tmp_upload_dir()
orig_filename = file.filename or "uploaded.docx"
# 安全检查:防止路径遍历攻击
filename = os.path.basename(filename)
file_path = os.path.join(upload_dir, filename)
# 安全检查:防止路径遍历攻击,保留原始文件名
filename = os.path.basename(orig_filename)
file_path = os.path.join(tmp_dir, filename)
# 如果文件已存在,添加序号
base, ext = os.path.splitext(filename)
counter = 1
while os.path.exists(file_path):
filename = f"{base}_{counter}{ext}"
file_path = os.path.join(upload_dir, filename)
counter += 1
# 保存文件
# 保存文件到临时目录(如已存在则覆盖)
content = await file.read()
with open(file_path, "wb") as f:
f.write(content)
return JSONResponse({
"success": True,
"filename": filename,
"file_path": file_path,
"file_url": _build_output_url(file_path),
"filename": filename, # 保留原始文件名,供 edit_docx 使用
"file_path": file_path, # 绝对路径(可选)
"file_url": None, # 临时文件不提供下载 URL
"size": len(content),
"message": f"文件上传成功: {filename}"
})