读取文件url

This commit is contained in:
2026-02-12 17:41:02 +08:00
parent 13d84ff352
commit 4b98e94942
3 changed files with 191 additions and 55 deletions

View File

@@ -26,8 +26,11 @@
import argparse
import os
import tempfile
import urllib.parse
from typing import Any, Dict, List, Optional
import requests
from mcp.server.fastmcp import FastMCP
from mcp.server.transport_security import TransportSecuritySettings
@@ -58,13 +61,60 @@ mcp = FastMCP(
)
def _is_url(path: str) -> bool:
"""简单判断一个字符串是否为 HTTP/HTTPS URL。"""
return path.startswith("http://") or path.startswith("https://")
def _download_to_temp(url: str, suffix: str = ".tmp") -> str:
"""
将远程 URL 下载到临时文件,返回本地临时路径。
调用方负责在使用完毕后删除该文件。
"""
resp = requests.get(url, stream=True, timeout=30)
resp.raise_for_status()
fd, tmp_path = tempfile.mkstemp(suffix=suffix)
try:
with os.fdopen(fd, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
except Exception:
# 出错时清理临时文件
try:
os.remove(tmp_path)
except OSError:
pass
raise
return tmp_path
def _build_output_url(abs_output_path: str) -> Optional[str]:
"""
根据环境变量 MCP_OUTPUT_BASE_URL 构造输出文件的 URL。
约定:
- MCP_OUTPUT_BASE_URL 形如: http://host:port/files/
- 最终 URL = MCP_OUTPUT_BASE_URL.rstrip('/') + '/' + 文件名
"""
base = os.getenv("MCP_OUTPUT_BASE_URL")
if not base:
return None
filename = os.path.basename(abs_output_path)
return base.rstrip("/") + "/" + filename
@mcp.tool()
async def list_docx_images(docx_path: str) -> List[Dict[str, Any]]:
"""
列出指定 DOCX 文件中的所有图片信息。
参数:
- docx_path: DOCX 文件的路径(相对或绝对)
- docx_path: DOCX 文件的路径(相对或绝对),也可以是 HTTP/HTTPS URL。
返回:
- 图片信息列表,每一项包含:
@@ -74,14 +124,29 @@ async def list_docx_images(docx_path: str) -> List[Dict[str, Any]]:
- docpr_name: Word 内部的图片名称
- width_cm / height_cm: 近似尺寸(厘米),可能为 None
"""
if not os.path.exists(docx_path):
raise FileNotFoundError(f"DOCX 文件不存在: {docx_path}")
tmp_file: Optional[str] = None
try:
local_path = docx_path
if _is_url(docx_path):
parsed = urllib.parse.urlparse(docx_path)
ext = os.path.splitext(parsed.path)[1] or ".docx"
tmp_file = _download_to_temp(docx_path, suffix=ext)
local_path = tmp_file
imgs = get_images_info(docx_path)
# 为了避免泄露容器内部路径,屏蔽 abs_path 字段
for img in imgs:
img.pop("abs_path", None)
return imgs
if not os.path.exists(local_path):
raise FileNotFoundError(f"DOCX 文件不存在: {docx_path}")
imgs = get_images_info(local_path)
# 为了避免泄露容器内部路径,屏蔽 abs_path 字段
for img in imgs:
img.pop("abs_path", None)
return imgs
finally:
if tmp_file and os.path.exists(tmp_file):
try:
os.remove(tmp_file)
except OSError:
pass
@mcp.tool()
@@ -100,7 +165,7 @@ async def edit_docx(
- 替换指定序号的图片
参数:
- input_docx: 输入 DOCX 文件路径
- input_docx: 输入 DOCX 文件路径,或 HTTP/HTTPS URL
- output_docx: 输出 DOCX 文件路径
- replacements: 文本替换规则列表,例如:
[
@@ -112,56 +177,100 @@ async def edit_docx(
{\"index\": 1, \"file\": \"new_chart.png\"},
{\"index\": 2, \"file\": \"new_photo.jpg\"}
]
其中 file 字段同样可以是本地路径或 HTTP/HTTPS URL。
返回:
- {\"output_path\": 生成的 DOCX 绝对路径}
- {
\"output_path\": 生成的 DOCX 绝对路径,
\"output_url\": 如果配置了 MCP_OUTPUT_BASE_URL则为可访问该文件的 URL否则为 null
}
"""
if not os.path.exists(input_docx):
raise FileNotFoundError(f"输入 DOCX 文件不存在: {input_docx}")
tmp_input: Optional[str] = None
tmp_images: List[str] = []
if replacements is None:
replacements = []
if image_replacements is None:
image_replacements = []
try:
local_input = input_docx
if _is_url(input_docx):
parsed = urllib.parse.urlparse(input_docx)
ext = os.path.splitext(parsed.path)[1] or ".docx"
tmp_input = _download_to_temp(input_docx, suffix=ext)
local_input = tmp_input
# 解析文本替换与颜色关键字(复用 CLI 逻辑)
rep_pairs = []
color_keywords = []
for item in replacements:
old = item.get("old")
new_raw = item.get("new")
if not old:
continue
if new_raw is None:
new_raw = ""
new_plain, spans = _parse_span_replacement(new_raw)
rep_pairs.append((old, new_plain))
color_keywords.extend(spans)
if not os.path.exists(local_input):
raise FileNotFoundError(f"输入 DOCX 文件不存在: {input_docx}")
# 处理图片替换参数
img_pairs = []
for item in image_replacements:
try:
idx = int(item.get("index"))
except (TypeError, ValueError):
continue
path = item.get("file")
if not path:
continue
if not os.path.exists(path):
raise FileNotFoundError(f"图片文件不存在: {path}")
img_pairs.append((idx, path))
if replacements is None:
replacements = []
if image_replacements is None:
image_replacements = []
# 复用原始处理函数
process(
input_docx=input_docx,
output_docx=output_docx,
replacements=rep_pairs,
image_replacements=img_pairs,
color_keywords=color_keywords,
)
# 解析文本替换与颜色关键字(复用 CLI 逻辑)
rep_pairs = []
color_keywords = []
for item in replacements:
old = item.get("old")
new_raw = item.get("new")
if not old:
continue
if new_raw is None:
new_raw = ""
new_plain, spans = _parse_span_replacement(new_raw)
rep_pairs.append((old, new_plain))
color_keywords.extend(spans)
return {"output_path": os.path.abspath(output_docx)}
# 处理图片替换参数(支持本地路径或 URL
img_pairs = []
for item in image_replacements:
try:
idx = int(item.get("index"))
except (TypeError, ValueError):
continue
path = item.get("file")
if not path:
continue
local_img = path
if _is_url(path):
parsed = urllib.parse.urlparse(path)
ext = os.path.splitext(parsed.path)[1] or ""
suffix = ext if ext else ".img"
tmp_img = _download_to_temp(path, suffix=suffix)
tmp_images.append(tmp_img)
local_img = tmp_img
if not os.path.exists(local_img):
raise FileNotFoundError(f"图片文件不存在: {path}")
img_pairs.append((idx, local_img))
# 复用原始处理函数
process(
input_docx=local_input,
output_docx=output_docx,
replacements=rep_pairs,
image_replacements=img_pairs,
color_keywords=color_keywords,
)
abs_out = os.path.abspath(output_docx)
return {
"output_path": abs_out,
"output_url": _build_output_url(abs_out),
}
finally:
if tmp_input and os.path.exists(tmp_input):
try:
os.remove(tmp_input)
except OSError:
pass
for p in tmp_images:
if os.path.exists(p):
try:
os.remove(p)
except OSError:
pass
if __name__ == "__main__":