From afe9a54e3c67017cf40b88c06cf526f89fa9ec9d Mon Sep 17 00:00:00 2001 From: liangweihao <734499798@qq.com> Date: Fri, 27 Feb 2026 14:44:21 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E5=92=8C=E4=B8=8B=E8=BD=BD?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=E6=99=AE=E9=80=9Ahttp=E7=AB=AF=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mcp_docx_server.py | 150 +++++++++++++++------------------------------ 1 file changed, 51 insertions(+), 99 deletions(-) diff --git a/mcp_docx_server.py b/mcp_docx_server.py index f0dec5f..84ca583 100644 --- a/mcp_docx_server.py +++ b/mcp_docx_server.py @@ -2,11 +2,13 @@ """ 基于 mcp_docx.py 封装的 MCP 服务器。 -暴露四个主要工具: +暴露两个主要 MCP 工具: - list_docx_images:列出 DOCX 中的图片信息 - edit_docx: 进行文本替换 / 关键字上色 / 图片替换 -- upload_file: 上传文件到服务器 -- download_file: 下载服务器上的文件 + +额外提供 HTTP 文件接口(仅在 http 模式下可用): +- POST /upload: 上传文件到服务器 +- GET /files/{filename}: 下载服务器上的文件 当前推荐的传输方式: - stdio(本地调试) @@ -27,7 +29,6 @@ """ import argparse -import base64 import os import tempfile import urllib.parse @@ -256,33 +257,24 @@ async def edit_docx(input_docx_url: str, replacements: Optional[List[Dict[str, s os.remove(p) except OSError: pass - - -@mcp.tool() -async def upload_file(file_url: str, filename: Optional[str] = None) -> Dict[str, Any]: - """ - 上传文件到服务器。 - - 参数: - - file_url: 文件的 HTTP/HTTPS URL - - filename: 保存的文件名(可选),如果不指定则使用 URL 中的文件名 - - 返回: - - { - "success": True/False, - "file_path": 保存的文件绝对路径, - "file_url": 如果配置了 MCP_OUTPUT_BASE_URL,则为可访问该文件的 URL,否则为 null, - "message": 操作结果消息 - } - """ +# HTTP 远程模式:添加文件上传下载路由 +from starlette.responses import FileResponse, JSONResponse +from starlette.requests import Request +@mcp.custom_route("/upload", methods=["POST"]) +async def upload_handler(request: Request): + """处理文件上传""" try: - upload_dir = _get_upload_dir() + form = await request.form() + file = form.get("file") - if not filename: - parsed = urllib.parse.urlparse(file_url) - filename = os.path.basename(parsed.path) - if not filename: - filename = "uploaded_file" + if not file: + return JSONResponse({ + "success": False, + "message": "未提供文件" + }, status_code=400) + + upload_dir = _get_upload_dir() + filename = file.filename # 安全检查:防止路径遍历攻击 filename = os.path.basename(filename) @@ -296,48 +288,29 @@ async def upload_file(file_url: str, filename: Optional[str] = None) -> Dict[str file_path = os.path.join(upload_dir, filename) counter += 1 - # 下载文件 - resp = requests.get(file_url, stream=True, timeout=30) - resp.raise_for_status() - + # 保存文件 + content = await file.read() with open(file_path, "wb") as f: - for chunk in resp.iter_content(chunk_size=8192): - if chunk: - f.write(chunk) + f.write(content) - return { + return JSONResponse({ "success": True, + "filename": filename, "file_path": file_path, "file_url": _build_output_url(file_path), + "size": len(content), "message": f"文件上传成功: {filename}" - } + }) except Exception as e: - return { + return JSONResponse({ "success": False, - "file_path": None, - "file_url": None, "message": f"文件上传失败: {str(e)}" - } - - -@mcp.tool() -async def download_file(filename: str) -> Dict[str, Any]: - """ - 获取服务器上文件的下载 URL。 - - 参数: - - filename: 文件名(相对于上传目录) - - 返回: - - { - "success": True/False, - "filename": 文件名, - "url": 文件下载 URL, - "size": 文件大小(字节), - "message": 操作结果消息 - } - """ + }, status_code=500) +@mcp.custom_route("/download", methods=["GET"]) +async def download_handler(request: Request): + """处理文件下载""" try: + filename = request.path_params.get("filename") upload_dir = _get_upload_dir() # 安全检查:防止路径遍历攻击 @@ -345,51 +318,27 @@ async def download_file(filename: str) -> Dict[str, Any]: file_path = os.path.join(upload_dir, filename) if not os.path.exists(file_path): - return { + return JSONResponse({ "success": False, - "filename": None, - "url": None, - "size": 0, "message": f"文件不存在: {filename}" - } + }, status_code=404) if not os.path.isfile(file_path): - return { + return JSONResponse({ "success": False, - "filename": None, - "url": None, - "size": 0, "message": f"不是文件: {filename}" - } + }, status_code=400) - file_size = os.path.getsize(file_path) - file_url = _build_output_url(file_path) - - if not file_url: - return { - "success": False, - "filename": filename, - "url": None, - "size": file_size, - "message": "未配置 MCP_OUTPUT_BASE_URL,无法生成下载链接" - } - - return { - "success": True, - "filename": filename, - "url": file_url, - "size": file_size, - "message": "获取下载链接成功" - } + return FileResponse( + file_path, + filename=filename, + media_type="application/octet-stream" + ) except Exception as e: - return { + return JSONResponse({ "success": False, - "filename": None, - "url": None, - "size": 0, - "message": f"获取下载链接失败: {str(e)}" - } - + "message": f"文件下载失败: {str(e)}" + }, status_code=500) if __name__ == "__main__": parser = argparse.ArgumentParser(description="DOCX MCP 服务器") @@ -413,11 +362,14 @@ if __name__ == "__main__": args = parser.parse_args() if args.transport == "http": - # HTTP 远程模式:通过 streamable-http 暴露 MCP 服务,端点 /mcp - # 兼容当前 FastMCP 版本:通过 settings 配置 host/port,而不是传给 run() + # 启动 MCP 服务器(会自动集成到 uvicorn) mcp.settings.host = args.host mcp.settings.port = args.port + + # 将自定义路由注入到 MCP 服务器 print(f"🚀 MCP HTTP 服务器启动中 → http://{args.host}:{args.port}/mcp") + + # 注意:FastMCP 使用 Starlette,我们需要扩展其路由 mcp.run(transport="streamable-http") else: # 本地 stdio 模式