上传和下载使用普通http端口

This commit is contained in:
2026-02-27 14:44:21 +08:00
parent 61340758b3
commit afe9a54e3c

View File

@@ -2,11 +2,13 @@
""" """
基于 mcp_docx.py 封装的 MCP 服务器。 基于 mcp_docx.py 封装的 MCP 服务器。
暴露个主要工具: 暴露个主要 MCP 工具:
- list_docx_images列出 DOCX 中的图片信息 - list_docx_images列出 DOCX 中的图片信息
- edit_docx: 进行文本替换 / 关键字上色 / 图片替换 - edit_docx: 进行文本替换 / 关键字上色 / 图片替换
- upload_file: 上传文件到服务器
- download_file: 下载服务器上的文件 额外提供 HTTP 文件接口(仅在 http 模式下可用):
- POST /upload: 上传文件到服务器
- GET /files/{filename}: 下载服务器上的文件
当前推荐的传输方式: 当前推荐的传输方式:
- stdio本地调试 - stdio本地调试
@@ -27,7 +29,6 @@
""" """
import argparse import argparse
import base64
import os import os
import tempfile import tempfile
import urllib.parse import urllib.parse
@@ -256,33 +257,24 @@ async def edit_docx(input_docx_url: str, replacements: Optional[List[Dict[str, s
os.remove(p) os.remove(p)
except OSError: except OSError:
pass pass
# HTTP 远程模式:添加文件上传下载路由
from starlette.responses import FileResponse, JSONResponse
@mcp.tool() from starlette.requests import Request
async def upload_file(file_url: str, filename: Optional[str] = None) -> Dict[str, Any]: @mcp.custom_route("/upload", methods=["POST"])
""" async def upload_handler(request: Request):
上传文件到服务器。 """处理文件上传"""
参数:
- file_url: 文件的 HTTP/HTTPS URL
- filename: 保存的文件名(可选),如果不指定则使用 URL 中的文件名
返回:
- {
"success": True/False,
"file_path": 保存的文件绝对路径,
"file_url": 如果配置了 MCP_OUTPUT_BASE_URL则为可访问该文件的 URL否则为 null,
"message": 操作结果消息
}
"""
try: try:
upload_dir = _get_upload_dir() form = await request.form()
file = form.get("file")
if not filename: if not file:
parsed = urllib.parse.urlparse(file_url) return JSONResponse({
filename = os.path.basename(parsed.path) "success": False,
if not filename: "message": "未提供文件"
filename = "uploaded_file" }, status_code=400)
upload_dir = _get_upload_dir()
filename = file.filename
# 安全检查:防止路径遍历攻击 # 安全检查:防止路径遍历攻击
filename = os.path.basename(filename) filename = os.path.basename(filename)
@@ -296,48 +288,29 @@ async def upload_file(file_url: str, filename: Optional[str] = None) -> Dict[str
file_path = os.path.join(upload_dir, filename) file_path = os.path.join(upload_dir, filename)
counter += 1 counter += 1
# 下载文件 # 保存文件
resp = requests.get(file_url, stream=True, timeout=30) content = await file.read()
resp.raise_for_status()
with open(file_path, "wb") as f: with open(file_path, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192): f.write(content)
if chunk:
f.write(chunk)
return { return JSONResponse({
"success": True, "success": True,
"filename": filename,
"file_path": file_path, "file_path": file_path,
"file_url": _build_output_url(file_path), "file_url": _build_output_url(file_path),
"size": len(content),
"message": f"文件上传成功: {filename}" "message": f"文件上传成功: {filename}"
} })
except Exception as e: except Exception as e:
return { return JSONResponse({
"success": False, "success": False,
"file_path": None,
"file_url": None,
"message": f"文件上传失败: {str(e)}" "message": f"文件上传失败: {str(e)}"
} }, status_code=500)
@mcp.custom_route("/download", methods=["GET"])
async def download_handler(request: Request):
@mcp.tool() """处理文件下载"""
async def download_file(filename: str) -> Dict[str, Any]:
"""
获取服务器上文件的下载 URL。
参数:
- filename: 文件名(相对于上传目录)
返回:
- {
"success": True/False,
"filename": 文件名,
"url": 文件下载 URL,
"size": 文件大小(字节),
"message": 操作结果消息
}
"""
try: try:
filename = request.path_params.get("filename")
upload_dir = _get_upload_dir() upload_dir = _get_upload_dir()
# 安全检查:防止路径遍历攻击 # 安全检查:防止路径遍历攻击
@@ -345,51 +318,27 @@ async def download_file(filename: str) -> Dict[str, Any]:
file_path = os.path.join(upload_dir, filename) file_path = os.path.join(upload_dir, filename)
if not os.path.exists(file_path): if not os.path.exists(file_path):
return { return JSONResponse({
"success": False, "success": False,
"filename": None,
"url": None,
"size": 0,
"message": f"文件不存在: {filename}" "message": f"文件不存在: {filename}"
} }, status_code=404)
if not os.path.isfile(file_path): if not os.path.isfile(file_path):
return { return JSONResponse({
"success": False, "success": False,
"filename": None,
"url": None,
"size": 0,
"message": f"不是文件: {filename}" "message": f"不是文件: {filename}"
} }, status_code=400)
file_size = os.path.getsize(file_path) return FileResponse(
file_url = _build_output_url(file_path) file_path,
filename=filename,
if not file_url: media_type="application/octet-stream"
return { )
"success": False,
"filename": filename,
"url": None,
"size": file_size,
"message": "未配置 MCP_OUTPUT_BASE_URL无法生成下载链接"
}
return {
"success": True,
"filename": filename,
"url": file_url,
"size": file_size,
"message": "获取下载链接成功"
}
except Exception as e: except Exception as e:
return { return JSONResponse({
"success": False, "success": False,
"filename": None, "message": f"文件下载失败: {str(e)}"
"url": None, }, status_code=500)
"size": 0,
"message": f"获取下载链接失败: {str(e)}"
}
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="DOCX MCP 服务器") parser = argparse.ArgumentParser(description="DOCX MCP 服务器")
@@ -413,11 +362,14 @@ if __name__ == "__main__":
args = parser.parse_args() args = parser.parse_args()
if args.transport == "http": if args.transport == "http":
# HTTP 远程模式:通过 streamable-http 暴露 MCP 服务,端点 /mcp # 启动 MCP 服务器(会自动集成到 uvicorn
# 兼容当前 FastMCP 版本:通过 settings 配置 host/port而不是传给 run()
mcp.settings.host = args.host mcp.settings.host = args.host
mcp.settings.port = args.port mcp.settings.port = args.port
# 将自定义路由注入到 MCP 服务器
print(f"🚀 MCP HTTP 服务器启动中 → http://{args.host}:{args.port}/mcp") print(f"🚀 MCP HTTP 服务器启动中 → http://{args.host}:{args.port}/mcp")
# 注意FastMCP 使用 Starlette我们需要扩展其路由
mcp.run(transport="streamable-http") mcp.run(transport="streamable-http")
else: else:
# 本地 stdio 模式 # 本地 stdio 模式