diff --git a/mcp_docx_server.py b/mcp_docx_server.py index 0cc9a5d..f0dec5f 100644 --- a/mcp_docx_server.py +++ b/mcp_docx_server.py @@ -2,9 +2,11 @@ """ 基于 mcp_docx.py 封装的 MCP 服务器。 -暴露两个主要工具: +暴露四个主要工具: - list_docx_images:列出 DOCX 中的图片信息 - edit_docx: 进行文本替换 / 关键字上色 / 图片替换 +- upload_file: 上传文件到服务器 +- download_file: 下载服务器上的文件 当前推荐的传输方式: - stdio(本地调试) @@ -25,6 +27,7 @@ """ import argparse +import base64 import os import tempfile import urllib.parse @@ -107,6 +110,17 @@ def _build_output_url(abs_output_path: str) -> Optional[str]: return base.rstrip("/") + "/" + filename +def _get_upload_dir() -> str: + """ + 获取文件上传目录。 + + 优先使用环境变量 MCP_UPLOAD_DIR,否则使用当前目录下的 uploads 文件夹。 + """ + upload_dir = os.getenv("MCP_UPLOAD_DIR", "./uploads") + os.makedirs(upload_dir, exist_ok=True) + return os.path.abspath(upload_dir) + + @mcp.tool() async def list_docx_images(docx_url: str) -> List[Dict[str, Any]]: """ @@ -127,7 +141,7 @@ async def list_docx_images(docx_url: str) -> List[Dict[str, Any]]: return imgs @mcp.tool() -async def edit_docx(input_docx_url: str, output_docx_url: str, replacements: Optional[List[Dict[str, str]]] = None, image_replacements: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]: +async def edit_docx(input_docx_url: str, replacements: Optional[List[Dict[str, str]]] = None, image_replacements: Optional[List[Dict[str, Any]]] = None) -> Dict[str, Any]: """ 使用原始 mcp_docx 逻辑对 DOCX 文件进行编辑。 @@ -138,7 +152,6 @@ async def edit_docx(input_docx_url: str, output_docx_url: str, replacements: Opt 参数: - input_docx_url: 输入 DOCX 文件HTTP/HTTPS URL - - output_docx_url: 输出 DOCX 文件HTTP/HTTPS URL - replacements: 文本替换规则列表,例如: [ {\"old\": \"旧标题\", \"new\": \"新标题\"}, @@ -159,7 +172,7 @@ async def edit_docx(input_docx_url: str, output_docx_url: str, replacements: Opt """ tmp_input: Optional[str] = None tmp_images: List[str] = [] - print(f"edit_docx: input_docx_url: {input_docx_url}, output_docx_url: {output_docx_url}, replacements: {replacements}, image_replacements: {image_replacements}") + print(f"edit_docx: input_docx_url: {input_docx_url}, replacements: {replacements}, image_replacements: {image_replacements}") try: local_input = input_docx_url if _is_url(input_docx_url): @@ -219,13 +232,13 @@ async def edit_docx(input_docx_url: str, output_docx_url: str, replacements: Opt # 复用原始处理函数 process( input_docx=local_input, - output_docx=output_docx_url, + output_docx=local_input, replacements=rep_pairs, image_replacements=img_pairs, color_keywords=color_keywords, ) - abs_out = os.path.abspath(output_docx_url) + abs_out = os.path.abspath(local_input) return { "output_path": abs_out, "output_url": _build_output_url(abs_out), @@ -245,6 +258,139 @@ async def edit_docx(input_docx_url: str, output_docx_url: str, replacements: Opt pass +@mcp.tool() +async def upload_file(file_url: str, filename: Optional[str] = None) -> Dict[str, Any]: + """ + 上传文件到服务器。 + + 参数: + - file_url: 文件的 HTTP/HTTPS URL + - filename: 保存的文件名(可选),如果不指定则使用 URL 中的文件名 + + 返回: + - { + "success": True/False, + "file_path": 保存的文件绝对路径, + "file_url": 如果配置了 MCP_OUTPUT_BASE_URL,则为可访问该文件的 URL,否则为 null, + "message": 操作结果消息 + } + """ + try: + upload_dir = _get_upload_dir() + + if not filename: + parsed = urllib.parse.urlparse(file_url) + filename = os.path.basename(parsed.path) + if not filename: + filename = "uploaded_file" + + # 安全检查:防止路径遍历攻击 + filename = os.path.basename(filename) + file_path = os.path.join(upload_dir, filename) + + # 如果文件已存在,添加序号 + base, ext = os.path.splitext(filename) + counter = 1 + while os.path.exists(file_path): + filename = f"{base}_{counter}{ext}" + file_path = os.path.join(upload_dir, filename) + counter += 1 + + # 下载文件 + resp = requests.get(file_url, stream=True, timeout=30) + resp.raise_for_status() + + with open(file_path, "wb") as f: + for chunk in resp.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + + return { + "success": True, + "file_path": file_path, + "file_url": _build_output_url(file_path), + "message": f"文件上传成功: {filename}" + } + except Exception as e: + return { + "success": False, + "file_path": None, + "file_url": None, + "message": f"文件上传失败: {str(e)}" + } + + +@mcp.tool() +async def download_file(filename: str) -> Dict[str, Any]: + """ + 获取服务器上文件的下载 URL。 + + 参数: + - filename: 文件名(相对于上传目录) + + 返回: + - { + "success": True/False, + "filename": 文件名, + "url": 文件下载 URL, + "size": 文件大小(字节), + "message": 操作结果消息 + } + """ + try: + upload_dir = _get_upload_dir() + + # 安全检查:防止路径遍历攻击 + filename = os.path.basename(filename) + file_path = os.path.join(upload_dir, filename) + + if not os.path.exists(file_path): + return { + "success": False, + "filename": None, + "url": None, + "size": 0, + "message": f"文件不存在: {filename}" + } + + if not os.path.isfile(file_path): + return { + "success": False, + "filename": None, + "url": None, + "size": 0, + "message": f"不是文件: {filename}" + } + + file_size = os.path.getsize(file_path) + file_url = _build_output_url(file_path) + + if not file_url: + return { + "success": False, + "filename": filename, + "url": None, + "size": file_size, + "message": "未配置 MCP_OUTPUT_BASE_URL,无法生成下载链接" + } + + return { + "success": True, + "filename": filename, + "url": file_url, + "size": file_size, + "message": "获取下载链接成功" + } + except Exception as e: + return { + "success": False, + "filename": None, + "url": None, + "size": 0, + "message": f"获取下载链接失败: {str(e)}" + } + + if __name__ == "__main__": parser = argparse.ArgumentParser(description="DOCX MCP 服务器") parser.add_argument(