Compare commits

...

4 Commits

Author SHA1 Message Date
ac66c5dd08 add:同时upload和edit时,使用etag进行版本标记 2026-04-02 10:52:59 +08:00
0d178c748e add:删除data标签 2026-04-02 10:43:45 +08:00
5ab2f2b76f docker忽略 2026-03-26 22:59:03 +08:00
2bb850e8e2 1.0 2026-03-26 22:56:58 +08:00
3 changed files with 645 additions and 268 deletions

27
.dockerignore Normal file
View File

@@ -0,0 +1,27 @@
# Git
.git
.gitignore
# Python 缓存与虚拟环境
__pycache__/
*.py[cod]
*$py.class
.venv/
venv/
env/
# 运行时/本地数据镜像内不需要compose 可挂载 uploads
tmp/
uploads/
logs/
# 临时目录与杂项
.tmp-*
*.log
.DS_Store
Thumbs.db
test.py
erom-report-service.tar
test*
erom-report-service.tar.backup

View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
docx_editor.py — 保留原格式替换文本 + 修改字体颜色 + 替换图片 docx_editor.py — 保留原格式替换文本 + 修改字体颜色
用法: 用法:
# 列出文档中所有图片 # 列出文档中所有图片
@@ -10,25 +10,14 @@ docx_editor.py — 保留原格式替换文本 + 修改字体颜色 + 替换图
python3 docx_editor.py input.docx output.docx \ python3 docx_editor.py input.docx output.docx \
--replace "原文" "新文" \ --replace "原文" "新文" \
--color "关键词" "FF0000" --color "关键词" "FF0000"
# 图片替换按文档中出现的顺序从1开始
python3 docx_editor.py input.docx output.docx \
--image 1 new_chart.png \
--image 2 new_photo.jpg
# 同时替换文字和图片
python3 docx_editor.py input.docx output.docx \
--replace "旧标题" "新标题" \
--image 1 new_image.png \
--color "重点" "FF0000"
""" """
import argparse import argparse
import copy
import os import os
import tempfile import tempfile
import zipfile import zipfile
from lxml import etree from lxml import etree
from PIL import Image
import re import re
W = 'http://schemas.openxmlformats.org/wordprocessingml/2006/main' W = 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'
@@ -37,12 +26,6 @@ A = 'http://schemas.openxmlformats.org/drawingml/2006/main'
R = 'http://schemas.openxmlformats.org/officeDocument/2006/relationships' R = 'http://schemas.openxmlformats.org/officeDocument/2006/relationships'
REL_TYPE_IMAGE = 'http://schemas.openxmlformats.org/officeDocument/2006/relationships/image' REL_TYPE_IMAGE = 'http://schemas.openxmlformats.org/officeDocument/2006/relationships/image'
EXT_TO_MIME = {
'png': 'image/png', 'jpg': 'image/jpeg', 'jpeg': 'image/jpeg',
'gif': 'image/gif', 'bmp': 'image/bmp', 'tiff': 'image/tiff',
'webp': 'image/webp',
}
def unpack(docx_path, out_dir): def unpack(docx_path, out_dir):
"""使用 zipfile 直接解包 .docx 到临时目录,替代外部 unpack.py 脚本。""" """使用 zipfile 直接解包 .docx 到临时目录,替代外部 unpack.py 脚本。"""
@@ -141,61 +124,146 @@ def get_images_info(docx_path):
return build_image_index(tmpdir) return build_image_index(tmpdir)
def replace_image(unpacked_dir, index, new_image_path): def _normalize_newlines(text):
"""替换第 index 张图片1-based""" if text is None:
imgs = build_image_index(unpacked_dir) return ''
if index < 1 or index > len(imgs): return str(text).replace('\r\n', '\n').replace('\r', '\n')
raise ValueError(f"图片序号 {index} 超出范围(共 {len(imgs)} 张)")
info = imgs[index - 1]
old_abs = info['abs_path']
old_ext = info['ext']
new_ext = os.path.splitext(new_image_path)[1].lstrip('.').lower()
if new_ext == 'jpg':
new_ext = 'jpeg'
print(f" 图片#{index} {os.path.basename(info['media_file'])}({old_ext.upper()})" def _is_text_node(el):
f"{os.path.basename(new_image_path)}({new_ext.upper()})") return el.tag == f'{{{W}}}t'
if old_ext == new_ext:
# ── 同格式:直接覆盖 ──────────────────────────────
import shutil
shutil.copy2(new_image_path, old_abs)
def _is_break_node(el):
return el.tag in (f'{{{W}}}br', f'{{{W}}}cr')
def _is_tab_node(el):
return el.tag == f'{{{W}}}tab'
def _iter_run_text_parts(run_el):
for child in run_el:
if _is_text_node(child):
yield child, _normalize_newlines(child.text or '')
elif _is_break_node(child):
yield child, '\n'
elif _is_tab_node(child):
yield child, '\t'
def _run_text(run_el):
return ''.join(part for _, part in _iter_run_text_parts(run_el))
def _paragraph_text(para_el):
return ''.join(_run_text(run) for run in para_el.iter(f'{{{W}}}r'))
def _clear_run_text_like_children(run_el):
for child in list(run_el):
if _is_text_node(child) or _is_break_node(child) or _is_tab_node(child):
run_el.remove(child)
def _append_text_to_run(run_el, text):
text = _normalize_newlines(text)
parts = text.split('\n')
if len(parts) == 1:
t_el = etree.SubElement(run_el, f'{{{W}}}t')
t_el.text = parts[0]
if parts[0] and (parts[0][0] == ' ' or parts[0][-1] == ' '):
t_el.set('{http://www.w3.org/XML/1998/namespace}space', 'preserve')
return
for idx, part in enumerate(parts):
if part:
t_el = etree.SubElement(run_el, f'{{{W}}}t')
t_el.text = part
if part[0] == ' ' or part[-1] == ' ':
t_el.set('{http://www.w3.org/XML/1998/namespace}space', 'preserve')
if idx < len(parts) - 1:
etree.SubElement(run_el, f'{{{W}}}br')
def _ensure_paragraph_run(para_el):
runs = list(para_el.findall(f'.//{{{W}}}r'))
if runs:
return runs[0]
ppr = para_el.find(f'{{{W}}}pPr')
new_r = etree.Element(f'{{{W}}}r')
if ppr is None:
para_el.insert(0, new_r)
else: else:
# ── 不同格式Pillow 转换 + 更新 rels + ContentTypes para_el.insert(para_el.index(ppr) + 1, new_r)
new_abs = os.path.splitext(old_abs)[0] + '.' + new_ext return new_r
img = Image.open(new_image_path)
fmt = {'jpeg': 'JPEG', 'png': 'PNG', 'gif': 'GIF',
'bmp': 'BMP', 'tiff': 'TIFF', 'webp': 'WEBP'}.get(new_ext, new_ext.upper())
if fmt == 'JPEG' and img.mode in ('RGBA', 'P'):
img = img.convert('RGB')
img.save(new_abs, format=fmt)
if os.path.abspath(new_abs) != os.path.abspath(old_abs):
os.remove(old_abs)
# 更新 rels
old_media = info['media_file'] def _set_paragraph_text(para_el, text):
new_media = os.path.splitext(old_media)[0] + '.' + new_ext runs = list(para_el.findall(f'.//{{{W}}}r'))
word_dir = os.path.join(unpacked_dir, 'word') text_runs = [run for run in runs if any(True for _ in _iter_run_text_parts(run))]
rels_path = os.path.join(word_dir, '_rels', 'document.xml.rels')
rels_tree = etree.parse(rels_path) if text_runs:
for rel in rels_tree.getroot(): first_run = text_runs[0]
if rel.get('Id') == info['rid']: for run in text_runs:
rel.set('Target', new_media) _clear_run_text_like_children(run)
else:
first_run = _ensure_paragraph_run(para_el)
_clear_run_text_like_children(first_run)
_append_text_to_run(first_run, text)
def _paragraph_list(doc_el):
return list(doc_el.iter(f'{{{W}}}p'))
def _replace_paragraph_block(doc_el, old_text, new_text):
old_segments = _normalize_newlines(old_text).split('\n\n')
new_segments = _normalize_newlines(new_text).split('\n\n')
if len(old_segments) <= 1:
return False
paras = _paragraph_list(doc_el)
para_texts = [_paragraph_text(p) for p in paras]
match_start = None
for i in range(0, len(para_texts) - len(old_segments) + 1):
if para_texts[i:i + len(old_segments)] == old_segments:
match_start = i
break break
rels_tree.write(rels_path, xml_declaration=True, encoding='UTF-8', standalone=True)
# 更新 ContentTypes if match_start is None:
ct_path = os.path.join(unpacked_dir, '[Content_Types].xml') return False
ct_tree = etree.parse(ct_path)
ct_root = ct_tree.getroot() matched_paras = paras[match_start:match_start + len(old_segments)]
existing = {el.get('Extension', '') for el in ct_root} parent = matched_paras[0].getparent()
if new_ext not in existing: if parent is None:
etree.SubElement(ct_root, 'Default', Extension=new_ext, return False
ContentType=EXT_TO_MIME.get(new_ext, f'image/{new_ext}'))
ct_tree.write(ct_path, xml_declaration=True, encoding='UTF-8', standalone=True) anchor_index = parent.index(matched_paras[-1])
print(f" 格式转换 {old_ext}{new_ext}rels 和 ContentTypes 已更新")
shared_count = min(len(matched_paras), len(new_segments))
for idx in range(shared_count):
_set_paragraph_text(matched_paras[idx], new_segments[idx])
if len(new_segments) > len(matched_paras):
template_para = matched_paras[-1]
insert_at = anchor_index + 1
for seg in new_segments[len(matched_paras):]:
new_para = copy.deepcopy(template_para)
_set_paragraph_text(new_para, seg)
parent.insert(insert_at, new_para)
insert_at += 1
elif len(new_segments) < len(matched_paras):
for para in matched_paras[len(new_segments):]:
para_parent = para.getparent()
if para_parent is not None:
para_parent.remove(para)
return True
def paragraph_replace(para_el, replacements): def paragraph_replace(para_el, replacements):
@@ -213,20 +281,27 @@ def paragraph_replace(para_el, replacements):
return return
# 收集所有文本元素及其位置信息 # 收集所有文本元素及其位置信息
t_elements = [] text_runs = []
for run in runs: for run in runs:
for t_el in run.findall(f'{{{W}}}t'): if any(True for _ in _iter_run_text_parts(run)):
t_elements.append((run, t_el)) text_runs.append(run)
if not t_elements: if not text_runs:
return return
# 拼接完整文本 # 拼接完整文本
full_text = ''.join(t_el.text or '' for _, t_el in t_elements) full_text = _paragraph_text(para_el)
original_text = full_text original_text = full_text
# 执行所有替换 normalized_replacements = []
for old, new in replacements: for old, new in replacements:
normalized_replacements.append((
_normalize_newlines(old),
_normalize_newlines(new),
))
# 执行所有替换
for old, new in normalized_replacements:
if old in full_text: if old in full_text:
full_text = full_text.replace(old, new) full_text = full_text.replace(old, new)
@@ -236,16 +311,11 @@ def paragraph_replace(para_el, replacements):
print(f"段落替换: {len(original_text)} -> {len(full_text)} 字符") print(f"段落替换: {len(original_text)} -> {len(full_text)} 字符")
# 将文本重新分配到原有的 <w:t> 元素中 # 将规范化文本重新写回第一个文本 run\n 会回写成 Word 的换行节点。
# 策略:将所有文本放入第一个元素,清空其他元素,避免不当切分导致换行 first_run = text_runs[0]
_, first_t_el = t_elements[0] for run in text_runs:
first_t_el.text = full_text _clear_run_text_like_children(run)
if full_text and (full_text[0] == ' ' or full_text[-1] == ' '): _append_text_to_run(first_run, full_text)
first_t_el.set('{http://www.w3.org/XML/1998/namespace}space', 'preserve')
# 清空其他 <w:t> 元素
for i in range(1, len(t_elements)):
t_elements[i][1].text = ''
def ensure_rpr(run_el): def ensure_rpr(run_el):
@@ -271,13 +341,15 @@ def apply_color_to_keyword(doc_el, keyword, hex_color, context_text=None):
当 context_text 不为空时,只在“整段文本包含该 context_text 的段落”中进行上色, 当 context_text 不为空时,只在“整段文本包含该 context_text 的段落”中进行上色,
避免同一个关键字在其他段落里被误伤(例如单独的数字 0 避免同一个关键字在其他段落里被误伤(例如单独的数字 0
""" """
keyword = _normalize_newlines(keyword)
context_text = _normalize_newlines(context_text) if context_text is not None else None
# 如果提供了上下文,只在包含该上下文的段落内着色 # 如果提供了上下文,只在包含该上下文的段落内着色
allowed_paras = None allowed_paras = None
if context_text: if context_text:
allowed_paras = set() allowed_paras = set()
for p in doc_el.iter(f'{{{W}}}p'): for p in doc_el.iter(f'{{{W}}}p'):
t_nodes = list(p.iter(f'{{{W}}}t')) full = _paragraph_text(p)
full = ''.join(t.text or '' for t in t_nodes)
if context_text in full: if context_text in full:
allowed_paras.add(p) allowed_paras.add(p)
@@ -294,10 +366,9 @@ def apply_color_to_keyword(doc_el, keyword, hex_color, context_text=None):
para = _find_ancestor_para(run) para = _find_ancestor_para(run)
if para not in allowed_paras: if para not in allowed_paras:
continue continue
t_nodes = list(run.findall(f'{{{W}}}t')) full_text = _run_text(run)
if not t_nodes: if not full_text:
continue continue
full_text = ''.join(t.text or '' for t in t_nodes)
if keyword not in full_text: if keyword not in full_text:
continue continue
@@ -317,10 +388,7 @@ def apply_color_to_keyword(doc_el, keyword, hex_color, context_text=None):
new_r = etree.Element(f'{{{W}}}r') new_r = etree.Element(f'{{{W}}}r')
if rpr_bytes is not None: if rpr_bytes is not None:
new_r.append(etree.fromstring(rpr_bytes)) new_r.append(etree.fromstring(rpr_bytes))
t_el = etree.SubElement(new_r, f'{{{W}}}t') _append_text_to_run(new_r, text)
t_el.text = text
if text and (text[0] == ' ' or text[-1] == ' '):
t_el.set('{http://www.w3.org/XML/1998/namespace}space', 'preserve')
if colored: if colored:
set_color_on_rpr(ensure_rpr(new_r), hex_color) set_color_on_rpr(ensure_rpr(new_r), hex_color)
return new_r return new_r
@@ -349,7 +417,7 @@ def apply_color_to_keyword(doc_el, keyword, hex_color, context_text=None):
def remove_rule_blocks(doc_el): def remove_rule_blocks(doc_el):
""" """
删除文档中位于 <global_rule>...</global_rule>、<rule>...</rule><chart_rule>...</chart_rule> 之间的所有段落。 删除文档中位于 <global_rule>...</global_rule>、<rule>...</rule><chart_rule>...</chart_rule> 和 <data>...</data> 之间的所有段落。
说明: 说明:
- 标签内容可能跨段落,这里按段落顺序遍历,记录是否处于 rule 块内。 - 标签内容可能跨段落,这里按段落顺序遍历,记录是否处于 rule 块内。
@@ -359,6 +427,7 @@ def remove_rule_blocks(doc_el):
inside_global = False inside_global = False
inside_rule = False inside_rule = False
inside_chart = False inside_chart = False
inside_data = False
paras_to_delete = [] paras_to_delete = []
# list(...) 防止在遍历时修改树结构 # list(...) 防止在遍历时修改树结构
@@ -368,12 +437,12 @@ def remove_rule_blocks(doc_el):
if not full: if not full:
# 空段落如果在块内,也删掉 # 空段落如果在块内,也删掉
if inside_global or inside_rule or inside_chart: if inside_global or inside_rule or inside_chart or inside_data:
paras_to_delete.append(p) paras_to_delete.append(p)
continue continue
# 当前是否在某个块内 # 当前是否在某个块内
if inside_global or inside_rule or inside_chart: if inside_global or inside_rule or inside_chart or inside_data:
paras_to_delete.append(p) paras_to_delete.append(p)
# 检测 global_rule 块 # 检测 global_rule 块
@@ -400,24 +469,26 @@ def remove_rule_blocks(doc_el):
if '</chart_rule>' in full: if '</chart_rule>' in full:
inside_chart = False inside_chart = False
# 检测 data 块
if '<data>' in full:
inside_data = True
if p not in paras_to_delete:
paras_to_delete.append(p)
if '</data>' in full:
inside_data = False
for p in paras_to_delete: for p in paras_to_delete:
parent = p.getparent() parent = p.getparent()
if parent is not None: if parent is not None:
parent.remove(p) parent.remove(p)
def process(input_docx, output_docx, replacements, image_replacements, def process(input_docx, output_docx, replacements, color_keywords):
color_keywords):
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
print(f"📂 解包 {input_docx} ...") print(f"📂 解包 {input_docx} ...")
unpack(input_docx, tmpdir) unpack(input_docx, tmpdir)
doc_xml_path = os.path.join(tmpdir, 'word', 'document.xml') doc_xml_path = os.path.join(tmpdir, 'word', 'document.xml')
if image_replacements:
print(f"🖼️ 替换 {len(image_replacements)} 张图片...")
for idx, new_img in image_replacements:
replace_image(tmpdir, idx, new_img)
tree = etree.parse(doc_xml_path) tree = etree.parse(doc_xml_path)
root = tree.getroot() root = tree.getroot()
@@ -426,8 +497,17 @@ def process(input_docx, output_docx, replacements, image_replacements,
if replacements: if replacements:
print(f"✏️ 替换 {len(replacements)} 条文本...") print(f"✏️ 替换 {len(replacements)} 条文本...")
remaining_replacements = []
for old, new in replacements:
if '\n\n' in _normalize_newlines(old):
replaced = _replace_paragraph_block(root, old, new)
if replaced:
print("🧩 跨段替换命中")
continue
remaining_replacements.append((old, new))
if remaining_replacements:
for para in root.iter(f'{{{W}}}p'): for para in root.iter(f'{{{W}}}p'):
paragraph_replace(para, replacements) paragraph_replace(para, remaining_replacements)
# 根据 span 解析出的关键字上色 # 根据 span 解析出的关键字上色
for item in color_keywords: for item in color_keywords:
@@ -457,6 +537,8 @@ def _parse_span_replacement(new_text):
""" """
import re import re
new_text = _normalize_newlines(new_text)
# 简单的命名颜色到 16 进制的映射,可按需扩展 # 简单的命名颜色到 16 进制的映射,可按需扩展
named_colors = { named_colors = {
'red': 'FF0000', 'red': 'FF0000',
@@ -505,33 +587,33 @@ def _parse_span_replacement(new_text):
re.IGNORECASE | re.DOTALL, re.IGNORECASE | re.DOTALL,
) )
# 先得到去掉 span 标签后的纯文本(也是最终会写入 DOCX 的内容) # 先按段落边界拆分,这样 span 上色时可以使用所在段落作为上下文。
def _strip_repl(m): def _strip_repl(m):
return m.group(2) return m.group(2)
plain_text = span_pattern.sub(_strip_repl, new_text) plain_segments = []
# 再次遍历 span收集颜色关键字并把“整句纯文本”作为上下文挂在每个关键字上
color_keywords = [] color_keywords = []
for m in span_pattern.finditer(new_text): for segment in new_text.split('\n\n'):
plain_segment = span_pattern.sub(_strip_repl, segment)
plain_segments.append(plain_segment)
for m in span_pattern.finditer(segment):
raw_color = m.group(1) raw_color = m.group(1)
hex_color = _normalize_color(raw_color) hex_color = _normalize_color(raw_color)
keyword = m.group(2) keyword = m.group(2)
# 三元组: (关键字, 颜色, 该 NEW 对应的整句纯文本上下文) # 三元组: (关键字, 颜色, 所在段落的纯文本上下文)
color_keywords.append((keyword, hex_color, plain_text)) color_keywords.append((keyword, hex_color, plain_segment))
plain_text = '\n\n'.join(plain_segments)
return plain_text, color_keywords return plain_text, color_keywords
def main(): def main():
parser = argparse.ArgumentParser(description='DOCX 格式保留:替换文本/图片/颜色') parser = argparse.ArgumentParser(description='DOCX 格式保留:替换文本/颜色')
parser.add_argument('input', help='输入 .docx') parser.add_argument('input', help='输入 .docx')
parser.add_argument('output', nargs='?', help='输出 .docx') parser.add_argument('output', nargs='?', help='输出 .docx')
parser.add_argument('--list-images', action='store_true', help='列出所有图片') parser.add_argument('--list-images', action='store_true', help='列出所有图片')
parser.add_argument('--replace', nargs=2, metavar=('OLD', 'NEW'), parser.add_argument('--replace', nargs=2, metavar=('OLD', 'NEW'),
action='append', default=[]) action='append', default=[])
parser.add_argument('--image', nargs=2, metavar=('INDEX', 'FILE'),
action='append', default=[], help='图片替换')
args = parser.parse_args() args = parser.parse_args()
if args.list_images: if args.list_images:
@@ -549,11 +631,10 @@ def main():
color_keywords.extend(spans) color_keywords.extend(spans)
process( process(
input_docx = args.input, input_docx=args.input,
output_docx = args.output, output_docx=args.output,
replacements = replacements, replacements=replacements,
image_replacements= [(int(i), f) for i, f in args.image], color_keywords=color_keywords,
color_keywords = color_keywords,
) )
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -2,9 +2,9 @@
""" """
基于 mcp_docx.py 封装的 MCP 服务器。 基于 mcp_docx.py 封装的 MCP 服务器。
暴露两个主要 MCP 工具: 暴露主要 MCP 工具:
- list_docx_images列出 DOCX 中的图片信息 - list_docx_images列出 DOCX 中的图片信息
- edit_docx: 进行文本替换 / 关键字上色 / 图片替换 - edit_docx 进行文本替换 / 关键字上色(与 HTTP POST /edit_docx 能力一致)
额外提供 HTTP 文件接口(仅在 http 模式下可用): 额外提供 HTTP 文件接口(仅在 http 模式下可用):
- POST /upload: 上传文件到服务器 - POST /upload: 上传文件到服务器
@@ -29,12 +29,17 @@
""" """
import argparse import argparse
import hashlib
import json
import os import os
import shutil
import tempfile import tempfile
import time
import urllib.parse import urllib.parse
import zipfile
from contextlib import contextmanager
from datetime import datetime, date, timedelta from datetime import datetime, date, timedelta
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
import uuid
import requests import requests
from lxml import etree from lxml import etree
@@ -43,6 +48,7 @@ from mcp.server.transport_security import TransportSecuritySettings
from mcp_docx import ( from mcp_docx import (
W, W,
_normalize_newlines,
get_images_info, get_images_info,
process, process,
_parse_span_replacement, _parse_span_replacement,
@@ -50,8 +56,18 @@ from mcp_docx import (
unpack, unpack,
pack, pack,
) )
# HTTP 远程模式:添加文件上传下载路由
from starlette.responses import FileResponse, JSONResponse
from starlette.background import BackgroundTask
from starlette.requests import Request
from starlette.responses import FileResponse, JSONResponse
_disable_dns_rebinding = os.getenv("MCP_DISABLE_HOST_CHECK") == "1" if os.name == "nt":
import msvcrt
else:
import fcntl
_disable_dns_rebinding = True
if _disable_dns_rebinding: if _disable_dns_rebinding:
# 参考 python-sdk 官方文档:关闭 DNS rebinding 防护(适合本地或已由外层网关做安全控制的环境) # 参考 python-sdk 官方文档:关闭 DNS rebinding 防护(适合本地或已由外层网关做安全控制的环境)
@@ -64,8 +80,8 @@ else:
# 如需通过网关 / 域名访问,可在这里追加 allowed_hosts / allowed_origins # 如需通过网关 / 域名访问,可在这里追加 allowed_hosts / allowed_origins
transport_security = TransportSecuritySettings( transport_security = TransportSecuritySettings(
enable_dns_rebinding_protection=True, enable_dns_rebinding_protection=True,
allowed_hosts=["localhost:*", "127.0.0.1:*", "192.168.10.101:*"], allowed_hosts=["localhost:*", "127.0.0.1:*", "192.168.1.13:*","10.150.172.13:*"],
allowed_origins=["http://localhost:*", "http://127.0.0.1:*","http://192.168.10.101:*"], allowed_origins=["http://localhost:*", "http://127.0.0.1:*","http://192.168.1.13:*","http://10.150.172.13:*"],
) )
@@ -234,6 +250,53 @@ def _download_to_temp(url: str, suffix: str = ".tmp") -> str:
return tmp_path return tmp_path
def _safe_filename(filename: Optional[str], default: str = "uploaded.docx") -> str:
"""提取安全文件名,避免路径穿越。"""
if not filename:
return default
decoded = urllib.parse.unquote(str(filename))
safe_name = os.path.basename(decoded).strip()
return safe_name or default
def _filename_from_url(url: str, default: str = "uploaded.docx") -> str:
"""从 URL 中推断文件名,优先读取 query 参数中的 filename。"""
parsed = urllib.parse.urlparse(url)
query = urllib.parse.parse_qs(parsed.query)
for key in ("filename", "fileName", "name"):
values = query.get(key)
if values:
return _safe_filename(values[0], default=default)
return _safe_filename(os.path.basename(parsed.path), default=default)
def _download_to_path(url: str, local_path: str) -> None:
"""将远程 URL 下载到指定路径,完成后原子覆盖目标文件。"""
resp = requests.get(url, stream=True, timeout=30)
resp.raise_for_status()
parent_dir = os.path.dirname(local_path) or "."
os.makedirs(parent_dir, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(
suffix=os.path.splitext(local_path)[1] or ".tmp",
dir=parent_dir,
)
try:
with os.fdopen(fd, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
os.replace(tmp_path, local_path)
except Exception:
try:
os.remove(tmp_path)
except OSError:
pass
raise
def _build_output_url(abs_output_path: str) -> Optional[str]: def _build_output_url(abs_output_path: str) -> Optional[str]:
""" """
构造输出文件的下载 URL。 构造输出文件的下载 URL。
@@ -245,18 +308,37 @@ def _build_output_url(abs_output_path: str) -> Optional[str]:
- 否则在 http 模式下: http://host:port/download/{filename} - 否则在 http 模式下: http://host:port/download/{filename}
- stdio 模式下: 返回 None - stdio 模式下: 返回 None
""" """
filename = os.path.basename(abs_output_path)
encoded_filename = urllib.parse.quote(filename)
def _append_filename(base_url: str) -> str:
parsed = urllib.parse.urlparse(base_url)
query = urllib.parse.parse_qsl(parsed.query, keep_blank_values=True)
for index, (key, _) in enumerate(query):
if key in ("filename", "fileName", "name"):
query[index] = (key, filename)
return urllib.parse.urlunparse(
parsed._replace(query=urllib.parse.urlencode(query))
)
if parsed.path.rstrip("/").endswith("/download"):
query.append(("filename", filename))
return urllib.parse.urlunparse(
parsed._replace(query=urllib.parse.urlencode(query))
)
return base_url.rstrip("/") + "/" + encoded_filename
# 优先使用环境变量 # 优先使用环境变量
base = os.getenv("MCP_OUTPUT_BASE_URL") base = os.getenv("MCP_OUTPUT_BASE_URL")
if base: if base:
filename = os.path.basename(abs_output_path) return _append_filename(base)
return base.rstrip("/") + "/" + filename
# 如果是 http 模式,自动构建下载 URL # 如果是 http 模式,自动构建下载 URL
if _server_config["transport"] == "http": if _server_config["transport"] == "http":
host = _server_config["host"] host = _server_config["host"]
port = _server_config["port"] port = _server_config["port"]
filename = os.path.basename(abs_output_path)
# 如果 host 是 0.0.0.0,尝试使用更具体的地址 # 如果 host 是 0.0.0.0,尝试使用更具体的地址
if host == "0.0.0.0": if host == "0.0.0.0":
# 优先使用环境变量指定的公网地址 # 优先使用环境变量指定的公网地址
@@ -267,7 +349,7 @@ def _build_output_url(abs_output_path: str) -> Optional[str]:
# 默认使用 localhost # 默认使用 localhost
host = "localhost" host = "localhost"
return f"http://{host}:{port}/download/{filename}" return _append_filename(f"http://{host}:{port}/download")
return None return None
@@ -294,6 +376,170 @@ def _get_tmp_upload_dir() -> str:
return os.path.abspath(tmp_dir) return os.path.abspath(tmp_dir)
def _get_lock_dir() -> str:
"""获取文件锁目录。"""
lock_dir = os.path.join(_get_upload_dir(), ".locks")
os.makedirs(lock_dir, exist_ok=True)
return lock_dir
def _get_lock_path(target_path: str) -> str:
"""根据目标文件路径生成稳定的锁文件路径。"""
abs_target = os.path.abspath(target_path)
base_name = _safe_filename(os.path.basename(abs_target), default="file")
digest = hashlib.sha256(abs_target.encode("utf-8")).hexdigest()
return os.path.join(_get_lock_dir(), f"{base_name}.{digest}.lock")
def _acquire_lock(handle) -> None:
"""跨进程独占锁。"""
handle.seek(0, os.SEEK_END)
if handle.tell() == 0:
handle.write(b"0")
handle.flush()
handle.seek(0)
if os.name == "nt":
while True:
try:
msvcrt.locking(handle.fileno(), msvcrt.LK_LOCK, 1)
break
except OSError:
time.sleep(0.05)
else:
fcntl.flock(handle.fileno(), fcntl.LOCK_EX)
def _release_lock(handle) -> None:
"""释放跨进程独占锁。"""
handle.seek(0)
if os.name == "nt":
msvcrt.locking(handle.fileno(), msvcrt.LK_UNLCK, 1)
else:
fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
@contextmanager
def _file_lock(target_path: str):
"""针对目标文件路径获取独占文件锁。"""
lock_path = _get_lock_path(target_path)
with open(lock_path, "a+b") as handle:
_acquire_lock(handle)
try:
yield
finally:
_release_lock(handle)
def _write_bytes_atomic(file_path: str, content: bytes) -> None:
"""原子写入文件内容。"""
parent_dir = os.path.dirname(file_path) or "."
os.makedirs(parent_dir, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(
suffix=os.path.splitext(file_path)[1] or ".tmp",
dir=parent_dir,
)
try:
with os.fdopen(fd, "wb") as f:
f.write(content)
os.replace(tmp_path, file_path)
except Exception:
try:
os.remove(tmp_path)
except OSError:
pass
raise
def _snapshot_file(file_path: str) -> str:
"""复制文件快照,供下载接口在释放锁后返回。"""
suffix = os.path.splitext(file_path)[1] or ".tmp"
fd, snapshot_path = tempfile.mkstemp(suffix=suffix)
try:
with os.fdopen(fd, "wb") as dst, open(file_path, "rb") as src:
shutil.copyfileobj(src, dst)
except Exception:
try:
os.remove(snapshot_path)
except OSError:
pass
raise
return snapshot_path
def _cleanup_temp_file(file_path: str) -> None:
try:
os.remove(file_path)
except OSError:
pass
def _compute_file_etag(file_path: str) -> str:
"""计算文件内容的 SHA-256 哈希,作为并发控制的 ETag。"""
h = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
# 内部版本注册表:记录每个文件最后一次 upload 或 edit_docx 之后的 etag。
# 所有读写必须持有对应文件的 _file_lock无需额外线程锁。
_file_etag_registry: Dict[str, str] = {}
def _register_etag(abs_path: str, etag: str) -> None:
_file_etag_registry[abs_path] = etag
def _check_etag(abs_path: str) -> None:
"""
在文件锁内调用:若注册表中存在该文件的 etag则校验当前磁盘文件是否匹配。
不匹配说明文件在本次操作排队期间已被其他操作(如并发 upload修改。
"""
known = _file_etag_registry.get(abs_path)
if not known:
return
current = _compute_file_etag(abs_path)
if current != known:
raise ValueError(
f"文件已被其他操作修改(版本冲突),请确认最新上传后重试。"
f"已知: {known[:12]}…,当前: {current[:12]}"
)
def _validate_docx_file(file_path: str) -> None:
if not os.path.exists(file_path):
raise FileNotFoundError(f"输入 DOCX 文件不存在: {file_path}")
if not os.path.isfile(file_path):
raise FileNotFoundError(f"输入路径不是文件: {file_path}")
size = os.path.getsize(file_path)
if size <= 0:
raise ValueError(f"输入 DOCX 文件为空: {os.path.basename(file_path)}")
if not zipfile.is_zipfile(file_path):
raise ValueError(f"输入文件不是合法的 DOCX/ZIP: {os.path.basename(file_path)}")
def _resolve_edit_target_path(input_docx_path: str, upload_dir: str) -> str:
"""
将编辑输入统一解析为本地路径。
- URL 输入会先下载到 uploads 目录
- 相对路径按 uploads 下的文件名处理
- 绝对路径直接使用
"""
if _is_url(input_docx_path):
filename = _filename_from_url(input_docx_path)
return os.path.join(upload_dir, filename)
if os.path.isabs(input_docx_path):
return input_docx_path
return os.path.join(upload_dir, _safe_filename(input_docx_path))
@mcp.tool() @mcp.tool()
async def list_docx_images(docx_url: str) -> List[Dict[str, Any]]: async def list_docx_images(docx_url: str) -> List[Dict[str, Any]]:
""" """
@@ -313,72 +559,37 @@ async def list_docx_images(docx_url: str) -> List[Dict[str, Any]]:
imgs = get_images_info(_download_to_temp(docx_url, suffix=".docx")) imgs = get_images_info(_download_to_temp(docx_url, suffix=".docx"))
return imgs return imgs
@mcp.custom_route("/edit_docx", methods=["POST"])
async def edit_docx_handler(request: Request): def _edit_docx_core(
data = await request.json() input_docx_path: str,
input_docx_path = data.get("input_docx_path") replacements: Optional[List[Dict[str, Any]]],
replacements = data.get("replacements") report_type: Optional[str],
image_replacements = data.get("image_replacements") report_title_time: Optional[str],
report_type = data.get("report_type") ) -> Dict[str, Any]:
report_title_time = data.get("report_title_time")
""" """
对 DOCX 文件进行编辑。 对 DOCX 文件进行编辑(与 HTTP /edit_docx 共用逻辑)
支持:
- 纯文本替换
- 通过 <span color="red">关键字</span> 语法设置关键字颜色
- 替换指定序号的图片
- 报告日期与期数自动替换(仅在“目录”之前生效)
参数:
- input_docx_path: 输入 DOCX 文件名称
- replacements: 文本替换规则列表,例如:
[
{"old": "计划作业总数共有10项。", "new": "计划作业总数共有<span color='red'>XX</span>项。"},
{"old": "文档原文本,必须是完整的一句话或者段落", "new": "要替换的文本"}
]
- image_replacements: 图片替换规则
- report_type: 报告类型,可选值:日报 / 周报 / 月报(或对应的英文 daily / weekly / monthly
- report_title_time: 报告标题中要显示的时间字符串用来替换“YYYY年M月”这一段仅在第一次匹配时生效
返回: 返回:
- { - {"output_path": 绝对路径, "output_url": URL 或 None, "etag": 新文件哈希}
"output_path": 生成的 DOCX 绝对路径,
"output_url": 如果配置了 MCP_OUTPUT_BASE_URL则为可访问该文件的 URL否则为 null
}
""" """
tmp_input: Optional[str] = None print(f"edit_docx: input_docx_path: {input_docx_path}, replacements: {replacements}")
tmp_images: List[str] = [] upload_dir = _get_upload_dir()
print(f"edit_docx: input_docx_path: {input_docx_path}, replacements: {replacements}, image_replacements: {image_replacements}") local_input = _resolve_edit_target_path(input_docx_path, upload_dir)
lock_cm = _file_lock(local_input)
lock_cm.__enter__()
try: try:
upload_dir = _get_upload_dir() # 输出目录:/uploads
tmp_upload_dir = _get_tmp_upload_dir() # 上传临时目录:/tmp
# 解析输入路径:支持 URL、绝对路径、仅文件名三种形式
local_input = input_docx_path
if _is_url(input_docx_path): if _is_url(input_docx_path):
parsed = urllib.parse.urlparse(input_docx_path) _download_to_path(input_docx_path, local_input)
ext = os.path.splitext(parsed.path)[1] or ".docx"
tmp_input = _download_to_temp(input_docx_path, suffix=ext)
local_input = tmp_input
elif not os.path.isabs(local_input):
# 相对路径:优先在 tmp其次在 uploads 中查找
cand_tmp = os.path.join(tmp_upload_dir, input_docx_path)
cand_upload = os.path.join(upload_dir, input_docx_path)
if os.path.exists(cand_tmp):
local_input = cand_tmp
else:
local_input = cand_upload
if not os.path.exists(local_input): _validate_docx_file(local_input)
raise FileNotFoundError(f"输入 DOCX 文件不存在: {input_docx_path}")
# 版本校验:在锁内对比注册表 etag检测并发 upload 导致的版本冲突
_check_etag(os.path.abspath(local_input))
if replacements is None: if replacements is None:
replacements = [] replacements = []
if image_replacements is None:
image_replacements = []
# 解析文本替换与颜色关键字(复用 CLI 逻辑)
rep_pairs = [] rep_pairs = []
color_keywords = [] color_keywords = []
for item in replacements: for item in replacements:
@@ -386,55 +597,25 @@ async def edit_docx_handler(request: Request):
new_raw = item.get("new") new_raw = item.get("new")
if not old: if not old:
continue continue
old = _normalize_newlines(old)
if new_raw is None: if new_raw is None:
new_raw = "" new_raw = ""
else:
new_raw = _normalize_newlines(new_raw)
new_plain, spans = _parse_span_replacement(new_raw) new_plain, spans = _parse_span_replacement(new_raw)
rep_pairs.append((old, new_plain)) rep_pairs.append((old, new_plain))
color_keywords.extend(spans) color_keywords.extend(spans)
# 处理图片替换参数(支持本地路径或 URL parent_dir = os.path.dirname(local_input) or "."
img_pairs = [] fd, output_docx = tempfile.mkstemp(suffix=".docx", dir=parent_dir)
for item in image_replacements: os.close(fd)
try:
idx = int(item.get("index"))
except (TypeError, ValueError):
continue
path = item.get("file")
if not path:
continue
local_img = path
if _is_url(path):
parsed = urllib.parse.urlparse(path)
ext = os.path.splitext(parsed.path)[1] or ""
suffix = ext if ext else ".img"
tmp_img = _download_to_temp(path, suffix=suffix)
tmp_images.append(tmp_img)
local_img = tmp_img
if not os.path.exists(local_img):
raise FileNotFoundError(f"图片文件不存在: {path}")
img_pairs.append((idx, local_img))
# 复用原始处理函数:
# 输出文件统一写入 /uploads 目录,文件名带时间戳和随机后缀避免并发冲突
base_name = os.path.basename(local_input)
name_root, _ = os.path.splitext(base_name)
ts = datetime.now().strftime('%Y%m%d%H%M%S')
rand = uuid.uuid4().hex[:6]
output_filename = f"{name_root}_output_{ts}_{rand}.docx"
output_docx = os.path.join(upload_dir, output_filename)
process( process(
input_docx=local_input, input_docx=local_input,
output_docx=output_docx, output_docx=output_docx,
replacements=rep_pairs, replacements=rep_pairs,
image_replacements=img_pairs,
color_keywords=color_keywords, color_keywords=color_keywords,
) )
# 追加:根据报告类型与标题时间,在“目录”之前自动处理日期和期数
if report_type or report_title_time: if report_type or report_title_time:
try: try:
_apply_report_date_logic_to_docx( _apply_report_date_logic_to_docx(
@@ -443,41 +624,98 @@ async def edit_docx_handler(request: Request):
report_title_time=report_title_time, report_title_time=report_title_time,
) )
except Exception as e: except Exception as e:
# 避免因为日期处理失败而导致整个接口报错,把错误写到日志即可
print(f"apply report date logic failed: {e}") print(f"apply report date logic failed: {e}")
abs_out = os.path.abspath(output_docx) os.replace(output_docx, local_input)
abs_out = os.path.abspath(local_input)
new_etag = _compute_file_etag(abs_out)
_register_etag(abs_out, new_etag)
# 删除上传的临时文件:只删除位于 tmp 目录中的输入文件
try:
tmp_root = _get_tmp_upload_dir()
if os.path.exists(local_input):
abs_input = os.path.abspath(local_input)
if os.path.commonpath([abs_input, tmp_root]) == tmp_root:
os.remove(local_input)
except Exception:
# 不因清理失败影响主流程
pass
return { return {
"output_path": output_docx, "output_path": abs_out,
"output_url": _build_output_url(output_docx), "output_url": _build_output_url(abs_out),
"etag": new_etag,
} }
except Exception:
if 'output_docx' in locals() and os.path.exists(output_docx):
try:
os.remove(output_docx)
except OSError:
pass
raise
finally: finally:
if tmp_input and os.path.exists(tmp_input): lock_cm.__exit__(None, None, None)
try:
os.remove(tmp_input)
except OSError:
pass
for p in tmp_images:
if os.path.exists(p): @mcp.tool()
def edit_docx(
input_docx_path: str,
replacements: List[Dict[str, Any]],
report_type: str,
report_title_time: str,
) -> Dict[str, Any]:
"""
对 DOCX 进行文本替换 / 关键字上色。
支持:
- 纯文本替换
- 通过 <span color="red">关键字</span> 语法设置关键字颜色
- 报告日期与期数自动替换(仅在“目录”之前生效)
参数:
- input_docx_path: 输入文件名
- replacements: 替换规则列表,例如
[{"old": "原文", "new": "<span color='red'>新文</span>"}]
- report_type: 日报/周报/月报 或 daily/weekly/monthly
- report_title_time: 替换标题中「YYYY年M月」为指定字符串首次匹配
返回:
- 成功: {"success": true, "output_path": ..., "output_url": ...}
- 失败: {"success": false, "message": "..."}
"""
try: try:
os.remove(p) out = _edit_docx_core(
except OSError: input_docx_path,
pass replacements,
# HTTP 远程模式:添加文件上传下载路由 report_type,
from starlette.responses import FileResponse, JSONResponse report_title_time,
from starlette.requests import Request )
return {"success": True, **out}
except Exception as e:
return {"success": False, "message": str(e)}
@mcp.custom_route("/edit_docx", methods=["POST"])
async def edit_docx_handler(request: Request):
try:
data = await request.json()
except json.JSONDecodeError:
return JSONResponse(
{
"success": False,
"message": (
"请求体必须是合法的 JSON 对象。"
"请使用 Content-Type: application/json并发送非空的 JSON body"
"(空 body、form-data 或 urlencoded 会导致此错误)。"
),
},
status_code=400,
)
input_docx_path = data.get("input_docx_path")
replacements = data.get("replacements")
report_type = data.get("report_type")
report_title_time = data.get("report_title_time")
try:
result = _edit_docx_core(
input_docx_path,
replacements,
report_type,
report_title_time,
)
return JSONResponse(result)
except Exception as e:
return JSONResponse({"success": False, "message": str(e)}, status_code=500)
def _get_log_path() -> str: def _get_log_path() -> str:
""" """
获取日志文件路径。 获取日志文件路径。
@@ -546,23 +784,30 @@ async def upload_handler(request: Request):
"message": "未提供文件" "message": "未提供文件"
}, status_code=400) }, status_code=400)
tmp_dir = _get_tmp_upload_dir() upload_dir = _get_upload_dir()
orig_filename = file.filename or "uploaded.docx" orig_filename = file.filename or "uploaded.docx"
# 安全检查:防止路径遍历攻击,保留原始文件名 # 安全检查:防止路径遍历攻击,保留原始文件名
filename = os.path.basename(orig_filename) filename = _safe_filename(orig_filename)
file_path = os.path.join(tmp_dir, filename) file_path = os.path.join(upload_dir, filename)
# 保存文件到临时目录(如已存在则覆盖) # 保存文件到 uploads 目录(如已存在则覆盖)
content = await file.read() content = await file.read()
with open(file_path, "wb") as f: if not content:
f.write(content) return JSONResponse({
"success": False,
"message": f"上传文件为空: {filename}"
}, status_code=400)
abs_file_path = os.path.abspath(file_path)
with _file_lock(file_path):
_write_bytes_atomic(file_path, content)
_register_etag(abs_file_path, _compute_file_etag(abs_file_path))
return JSONResponse({ return JSONResponse({
"success": True, "success": True,
"filename": filename, # 保留原始文件名,供 edit_docx 使用 "filename": filename, # 保留原始文件名,供 edit_docx 使用
"file_path": file_path, # 绝对路径(可选) "file_path": file_path, # 绝对路径(可选)
"file_url": None, # 临时文件不提供下载 URL "file_url": _build_output_url(file_path),
"size": len(content), "size": len(content),
"message": f"文件上传成功: {filename}" "message": f"文件上传成功: {filename}"
}) })
@@ -571,17 +816,36 @@ async def upload_handler(request: Request):
"success": False, "success": False,
"message": f"文件上传失败: {str(e)}" "message": f"文件上传失败: {str(e)}"
}, status_code=500) }, status_code=500)
@mcp.custom_route("/download", methods=["GET"])
@mcp.custom_route("/download/{filename}", methods=["GET"]) @mcp.custom_route("/download/{filename}", methods=["GET"])
async def download_handler(request: Request): async def download_handler(request: Request):
"""处理文件下载""" """处理文件下载"""
try: try:
filename = request.path_params.get("filename") filename = (
request.path_params.get("filename")
or request.query_params.get("filename")
or request.query_params.get("fileName")
or request.query_params.get("name")
)
upload_dir = _get_upload_dir() upload_dir = _get_upload_dir()
download_filename = (
request.query_params.get("download_filename")
or request.query_params.get("new_filename")
or request.query_params.get("rename_filename")
)
if not filename:
return JSONResponse({
"success": False,
"message": "缺少 filename 参数"
}, status_code=400)
# 安全检查:防止路径遍历攻击 # 安全检查:防止路径遍历攻击
filename = os.path.basename(filename) filename = _safe_filename(filename)
file_path = os.path.join(upload_dir, filename) file_path = os.path.join(upload_dir, filename)
lock_cm = _file_lock(file_path)
lock_cm.__enter__()
try:
if not os.path.exists(file_path): if not os.path.exists(file_path):
return JSONResponse({ return JSONResponse({
"success": False, "success": False,
@@ -594,10 +858,15 @@ async def download_handler(request: Request):
"message": f"不是文件: {filename}" "message": f"不是文件: {filename}"
}, status_code=400) }, status_code=400)
snapshot_path = _snapshot_file(file_path)
finally:
lock_cm.__exit__(None, None, None)
return FileResponse( return FileResponse(
file_path, snapshot_path,
filename=filename, filename=_safe_filename(download_filename, default=filename),
media_type="application/octet-stream" media_type="application/octet-stream",
background=BackgroundTask(_cleanup_temp_file, snapshot_path),
) )
except Exception as e: except Exception as e:
return JSONResponse({ return JSONResponse({