添加文件上传、下载接口

This commit is contained in:
2026-02-27 14:19:19 +08:00
parent 3d3d83502e
commit 61340758b3

View File

@@ -2,9 +2,11 @@
""" """
基于 mcp_docx.py 封装的 MCP 服务器。 基于 mcp_docx.py 封装的 MCP 服务器。
暴露个主要工具: 暴露个主要工具:
- list_docx_images列出 DOCX 中的图片信息 - list_docx_images列出 DOCX 中的图片信息
- edit_docx: 进行文本替换 / 关键字上色 / 图片替换 - edit_docx: 进行文本替换 / 关键字上色 / 图片替换
- upload_file: 上传文件到服务器
- download_file: 下载服务器上的文件
当前推荐的传输方式: 当前推荐的传输方式:
- stdio本地调试 - stdio本地调试
@@ -25,6 +27,7 @@
""" """
import argparse import argparse
import base64
import os import os
import tempfile import tempfile
import urllib.parse import urllib.parse
@@ -107,6 +110,17 @@ def _build_output_url(abs_output_path: str) -> Optional[str]:
return base.rstrip("/") + "/" + filename return base.rstrip("/") + "/" + filename
def _get_upload_dir() -> str:
"""
获取文件上传目录。
优先使用环境变量 MCP_UPLOAD_DIR否则使用当前目录下的 uploads 文件夹。
"""
upload_dir = os.getenv("MCP_UPLOAD_DIR", "./uploads")
os.makedirs(upload_dir, exist_ok=True)
return os.path.abspath(upload_dir)
@mcp.tool() @mcp.tool()
async def list_docx_images(docx_url: str) -> List[Dict[str, Any]]: async def list_docx_images(docx_url: str) -> List[Dict[str, Any]]:
""" """
@@ -127,7 +141,7 @@ async def list_docx_images(docx_url: str) -> List[Dict[str, Any]]:
return imgs return imgs
@mcp.tool() @mcp.tool()
async def edit_docx(input_docx_url: str, output_docx_url: str, replacements: Optional[List[Dict[str, str]]] = None, image_replacements: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]: async def edit_docx(input_docx_url: str, replacements: Optional[List[Dict[str, str]]] = None, image_replacements: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]:
""" """
使用原始 mcp_docx 逻辑对 DOCX 文件进行编辑。 使用原始 mcp_docx 逻辑对 DOCX 文件进行编辑。
@@ -138,7 +152,6 @@ async def edit_docx(input_docx_url: str, output_docx_url: str, replacements: Opt
参数: 参数:
- input_docx_url: 输入 DOCX 文件HTTP/HTTPS URL - input_docx_url: 输入 DOCX 文件HTTP/HTTPS URL
- output_docx_url: 输出 DOCX 文件HTTP/HTTPS URL
- replacements: 文本替换规则列表,例如: - replacements: 文本替换规则列表,例如:
[ [
{\"old\": \"旧标题\", \"new\": \"<span color='#FF0000'>新标题</span>\"}, {\"old\": \"旧标题\", \"new\": \"<span color='#FF0000'>新标题</span>\"},
@@ -159,7 +172,7 @@ async def edit_docx(input_docx_url: str, output_docx_url: str, replacements: Opt
""" """
tmp_input: Optional[str] = None tmp_input: Optional[str] = None
tmp_images: List[str] = [] tmp_images: List[str] = []
print(f"edit_docx: input_docx_url: {input_docx_url}, output_docx_url: {output_docx_url}, replacements: {replacements}, image_replacements: {image_replacements}") print(f"edit_docx: input_docx_url: {input_docx_url}, replacements: {replacements}, image_replacements: {image_replacements}")
try: try:
local_input = input_docx_url local_input = input_docx_url
if _is_url(input_docx_url): if _is_url(input_docx_url):
@@ -219,13 +232,13 @@ async def edit_docx(input_docx_url: str, output_docx_url: str, replacements: Opt
# 复用原始处理函数 # 复用原始处理函数
process( process(
input_docx=local_input, input_docx=local_input,
output_docx=output_docx_url, output_docx=local_input,
replacements=rep_pairs, replacements=rep_pairs,
image_replacements=img_pairs, image_replacements=img_pairs,
color_keywords=color_keywords, color_keywords=color_keywords,
) )
abs_out = os.path.abspath(output_docx_url) abs_out = os.path.abspath(local_input)
return { return {
"output_path": abs_out, "output_path": abs_out,
"output_url": _build_output_url(abs_out), "output_url": _build_output_url(abs_out),
@@ -245,6 +258,139 @@ async def edit_docx(input_docx_url: str, output_docx_url: str, replacements: Opt
pass pass
@mcp.tool()
async def upload_file(file_url: str, filename: Optional[str] = None) -> Dict[str, Any]:
"""
上传文件到服务器。
参数:
- file_url: 文件的 HTTP/HTTPS URL
- filename: 保存的文件名(可选),如果不指定则使用 URL 中的文件名
返回:
- {
"success": True/False,
"file_path": 保存的文件绝对路径,
"file_url": 如果配置了 MCP_OUTPUT_BASE_URL则为可访问该文件的 URL否则为 null,
"message": 操作结果消息
}
"""
try:
upload_dir = _get_upload_dir()
if not filename:
parsed = urllib.parse.urlparse(file_url)
filename = os.path.basename(parsed.path)
if not filename:
filename = "uploaded_file"
# 安全检查:防止路径遍历攻击
filename = os.path.basename(filename)
file_path = os.path.join(upload_dir, filename)
# 如果文件已存在,添加序号
base, ext = os.path.splitext(filename)
counter = 1
while os.path.exists(file_path):
filename = f"{base}_{counter}{ext}"
file_path = os.path.join(upload_dir, filename)
counter += 1
# 下载文件
resp = requests.get(file_url, stream=True, timeout=30)
resp.raise_for_status()
with open(file_path, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return {
"success": True,
"file_path": file_path,
"file_url": _build_output_url(file_path),
"message": f"文件上传成功: {filename}"
}
except Exception as e:
return {
"success": False,
"file_path": None,
"file_url": None,
"message": f"文件上传失败: {str(e)}"
}
@mcp.tool()
async def download_file(filename: str) -> Dict[str, Any]:
"""
获取服务器上文件的下载 URL。
参数:
- filename: 文件名(相对于上传目录)
返回:
- {
"success": True/False,
"filename": 文件名,
"url": 文件下载 URL,
"size": 文件大小(字节),
"message": 操作结果消息
}
"""
try:
upload_dir = _get_upload_dir()
# 安全检查:防止路径遍历攻击
filename = os.path.basename(filename)
file_path = os.path.join(upload_dir, filename)
if not os.path.exists(file_path):
return {
"success": False,
"filename": None,
"url": None,
"size": 0,
"message": f"文件不存在: {filename}"
}
if not os.path.isfile(file_path):
return {
"success": False,
"filename": None,
"url": None,
"size": 0,
"message": f"不是文件: {filename}"
}
file_size = os.path.getsize(file_path)
file_url = _build_output_url(file_path)
if not file_url:
return {
"success": False,
"filename": filename,
"url": None,
"size": file_size,
"message": "未配置 MCP_OUTPUT_BASE_URL无法生成下载链接"
}
return {
"success": True,
"filename": filename,
"url": file_url,
"size": file_size,
"message": "获取下载链接成功"
}
except Exception as e:
return {
"success": False,
"filename": None,
"url": None,
"size": 0,
"message": f"获取下载链接失败: {str(e)}"
}
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="DOCX MCP 服务器") parser = argparse.ArgumentParser(description="DOCX MCP 服务器")
parser.add_argument( parser.add_argument(