Fix XML encoding error

This commit is contained in:
Jarvis
2026-01-01 19:22:44 +08:00
parent 530faa6835
commit f57e818c19

View File

@@ -7,45 +7,48 @@ from rfeed import Item, Feed, Guid
from email.utils import parsedate_to_datetime from email.utils import parsedate_to_datetime
# --- 配置区域 --- # --- 配置区域 ---
OUTPUT_FILE = "filtered_feed.xml" # 输出文件 OUTPUT_FILE = "filtered_feed.xml"
MAX_ITEMS = 10000 # RSS中保留的最大条目数滚动窗口 MAX_ITEMS = 1000
JOURNALS_FILE = 'journals.dat'
KEYWORDS_FILE = 'keywords.dat'
# ---------------- # ----------------
def load_config(filename, env_var_name=None): def load_config(filename, env_var_name=None):
""" """(保持你之前的 load_config 代码不变)"""
优先从环境变量读取配置(用于 GitHub Actions 保护隐私), # ... 请保留你之前为了隐私修改过的 load_config 函数 ...
如果环境变量不存在,则读取本地文件(用于本地测试)。 # 这里为了篇幅省略,请直接复用你现在的 load_config
"""
# 1. 尝试读取环境变量 (Secrets)
if env_var_name and os.environ.get(env_var_name): if env_var_name and os.environ.get(env_var_name):
print(f"Loading config from environment variable: {env_var_name}") print(f"Loading config from environment variable: {env_var_name}")
# 假设环境变量里用分号 ; 或者换行符分隔
content = os.environ[env_var_name] content = os.environ[env_var_name]
# 兼容换行符或分号分隔
if '\n' in content: if '\n' in content:
return [line.strip() for line in content.split('\n') if line.strip()] return [line.strip() for line in content.split('\n') if line.strip()]
else: else:
return [line.strip() for line in content.split(';') if line.strip()] return [line.strip() for line in content.split(';') if line.strip()]
# 2. 尝试读取本地文件
if os.path.exists(filename): if os.path.exists(filename):
print(f"Loading config from local file: {filename}") print(f"Loading config from local file: {filename}")
with open(filename, 'r', encoding='utf-8') as f: with open(filename, 'r', encoding='utf-8') as f:
return [line.strip() for line in f if line.strip() and not line.startswith('#')] return [line.strip() for line in f if line.strip() and not line.startswith('#')]
print(f"Warning: No config found for {filename} or {env_var_name}")
return [] return []
# --- 新增XML 非法字符清洗函数 ---
def remove_illegal_xml_chars(text):
"""
移除 XML 1.0 不支持的 ASCII 控制字符 (Char value 0-8, 11-12, 14-31)
"""
if not text:
return ""
# 正则表达式:匹配 ASCII 0-8, 11, 12, 14-31 这些控制字符
# \x09是tab, \x0a是换行, \x0d是回车这些是合法的所以不删
illegal_chars = r'[\x00-\x08\x0b\x0c\x0e-\x1f]'
return re.sub(illegal_chars, '', text)
def convert_struct_time_to_datetime(struct_time): def convert_struct_time_to_datetime(struct_time):
"""将 feedparser 的时间结构转换为 datetime 对象"""
if not struct_time: if not struct_time:
return datetime.datetime.now() return datetime.datetime.now()
return datetime.datetime.fromtimestamp(time.mktime(struct_time)) return datetime.datetime.fromtimestamp(time.mktime(struct_time))
def parse_rss(rss_url, retries=3): def parse_rss(rss_url, retries=3):
"""解析在线 RSS 订阅""" # (保持不变)
print(f"Fetching: {rss_url}...") print(f"Fetching: {rss_url}...")
for attempt in range(retries): for attempt in range(retries):
try: try:
@@ -54,7 +57,6 @@ def parse_rss(rss_url, retries=3):
journal_title = feed.feed.get('title', 'Unknown Journal') journal_title = feed.feed.get('title', 'Unknown Journal')
for entry in feed.entries: for entry in feed.entries:
# 获取标准时间
pub_struct = entry.get('published_parsed', entry.get('updated_parsed')) pub_struct = entry.get('published_parsed', entry.get('updated_parsed'))
pub_date = convert_struct_time_to_datetime(pub_struct) pub_date = convert_struct_time_to_datetime(pub_struct)
@@ -64,7 +66,7 @@ def parse_rss(rss_url, retries=3):
'pub_date': pub_date, 'pub_date': pub_date,
'summary': entry.get('summary', entry.get('description', '')), 'summary': entry.get('summary', entry.get('description', '')),
'journal': journal_title, 'journal': journal_title,
'id': entry.get('id', entry.get('link', '')) # ID 用于去重 'id': entry.get('id', entry.get('link', ''))
}) })
return entries return entries
except Exception as e: except Exception as e:
@@ -73,46 +75,46 @@ def parse_rss(rss_url, retries=3):
return [] return []
def get_existing_items(): def get_existing_items():
"""读取上一次生成的 XML 文件,保留历史数据""" # (保持不变,但增加容错:如果 XML 坏了,就返回空列表重新抓)
if not os.path.exists(OUTPUT_FILE): if not os.path.exists(OUTPUT_FILE):
return [] return []
print(f"Loading existing items from {OUTPUT_FILE}...") print(f"Loading existing items from {OUTPUT_FILE}...")
try: try:
# feedparser 也可以解析本地 XML 文件
feed = feedparser.parse(OUTPUT_FILE) feed = feedparser.parse(OUTPUT_FILE)
# 如果解析出错(比如现在的 invalid charfeedparser 可能会拿到空或者 bozo 标志
if hasattr(feed, 'bozo') and feed.bozo == 1:
print("Warning: Existing XML file might be corrupted. Ignoring old items.")
# 这里可以选择 return [] 直接丢弃坏掉的旧数据,重新开始
# return []
# 或者尝试读取能读的部分(取决于损坏位置)
entries = [] entries = []
for entry in feed.entries: for entry in feed.entries:
# 恢复 datetime 对象
pub_struct = entry.get('published_parsed') pub_struct = entry.get('published_parsed')
pub_date = convert_struct_time_to_datetime(pub_struct) pub_date = convert_struct_time_to_datetime(pub_struct)
# 注意:生成的 XML 标题通常是 "[Journal] Title",这里我们需要尽量保持原样
# 或者为了简单起见,我们直接存储读取到的内容
entries.append({ entries.append({
'title': entry.get('title', ''), # 这里标题已经包含 [Journal] 前缀了 'title': entry.get('title', ''),
'link': entry.get('link', ''), 'link': entry.get('link', ''),
'pub_date': pub_date, 'pub_date': pub_date,
'summary': entry.get('summary', ''), 'summary': entry.get('summary', ''),
'journal': entry.get('author', ''), # 我们在生成时把 journal 存入了 author 字段 'journal': entry.get('author', ''),
'id': entry.get('id', entry.get('link', '')), 'id': entry.get('id', entry.get('link', '')),
'is_old': True # 标记为旧数据,不需要再次关键词匹配 'is_old': True
}) })
return entries return entries
except Exception as e: except Exception as e:
print(f"Error reading existing file: {e}") print(f"Error reading existing file: {e}")
return [] return [] # 如果旧文件读不了,就当做第一次运行
def match_entry(entry, queries): def match_entry(entry, queries):
"""关键词匹配""" # (保持不变)
# 构造待搜索文本
text_to_search = (entry['title'] + " " + entry['summary']).lower() text_to_search = (entry['title'] + " " + entry['summary']).lower()
for query in queries: for query in queries:
keywords = [k.strip().lower() for k in query.split('AND')] keywords = [k.strip().lower() for k in query.split('AND')]
match = True match = True
for keyword in keywords: for keyword in keywords:
# 使用简单的字符串包含判断,比正则更快,且对科研关键词通常足够
if keyword not in text_to_search: if keyword not in text_to_search:
match = False match = False
break break
@@ -121,37 +123,37 @@ def match_entry(entry, queries):
return False return False
def generate_rss_xml(items): def generate_rss_xml(items):
"""生成 RSS 2.0 XML 文件""" """生成 RSS 2.0 XML 文件 (已加入非法字符清洗)"""
rss_items = [] rss_items = []
# 按时间倒序排列(最新的在最前)
# 确保所有 item 都有 pub_date 且是 datetime 对象
items.sort(key=lambda x: x['pub_date'], reverse=True) items.sort(key=lambda x: x['pub_date'], reverse=True)
# 截取最新的 MAX_ITEMS 条
items = items[:MAX_ITEMS] items = items[:MAX_ITEMS]
for item in items: for item in items:
# 如果是旧数据,标题可能已经是 "[Journal] Title" 格式,需要避免重复添加前缀
title = item['title'] title = item['title']
if not item.get('is_old', False): if not item.get('is_old', False):
# 新数据,添加期刊前缀
title = f"[{item['journal']}] {item['title']}" title = f"[{item['journal']}] {item['title']}"
# --- 关键修改:清洗数据 ---
clean_title = remove_illegal_xml_chars(title)
clean_summary = remove_illegal_xml_chars(item['summary'])
clean_journal = remove_illegal_xml_chars(item['journal'])
# -----------------------
rss_item = Item( rss_item = Item(
title = title, title = clean_title,
link = item['link'], link = item['link'],
description = item['summary'], description = clean_summary,
author = item['journal'], # 借用 author 字段存储期刊名 author = clean_journal,
guid = Guid(item['id']), guid = Guid(item['id']),
pubDate = item['pub_date'] pubDate = item['pub_date']
) )
rss_items.append(rss_item) rss_items.append(rss_item)
feed = Feed( feed = Feed(
title = "My Customized Papers (Auto-Filtered)", title = "My Customized Papers",
link = "https://github.com/your_username/your_repo", link = "https://github.com/your_username/your_repo",
description = "Aggregated research papers based on keywords", description = "Aggregated research papers",
language = "en-US", language = "en-US",
lastBuildDate = datetime.datetime.now(), lastBuildDate = datetime.datetime.now(),
items = rss_items items = rss_items
@@ -162,41 +164,34 @@ def generate_rss_xml(items):
print(f"Successfully generated {OUTPUT_FILE} with {len(rss_items)} items.") print(f"Successfully generated {OUTPUT_FILE} with {len(rss_items)} items.")
def main(): def main():
# 1. 读取配置 # 请确保这里的调用参数与你目前的 secrets 配置一致
rss_urls = load_config('journals.dat', 'RSS_JOURNALS') rss_urls = load_config('journals.dat', 'RSS_JOURNALS')
queries = load_config('keywords.dat', 'RSS_KEYWORDS') queries = load_config('keywords.dat', 'RSS_KEYWORDS')
if not rss_urls or not queries: if not rss_urls or not queries:
print("Error: Configuration files are empty or missing.") print("Error: Configuration files are empty or missing.")
return return
# 2. 读取旧数据(核心去重策略:保留历史)
existing_entries = get_existing_items() existing_entries = get_existing_items()
# 创建一个已有 ID 的集合,用于快速查重
seen_ids = set(entry['id'] for entry in existing_entries) seen_ids = set(entry['id'] for entry in existing_entries)
all_entries = existing_entries.copy() all_entries = existing_entries.copy()
new_count = 0 new_count = 0
# 3. 抓取新数据
print("Starting RSS fetch from remote...") print("Starting RSS fetch from remote...")
for url in rss_urls: for url in rss_urls:
fetched_entries = parse_rss(url) fetched_entries = parse_rss(url)
for entry in fetched_entries: for entry in fetched_entries:
# 查重:如果 ID 已经在旧数据里,直接跳过
if entry['id'] in seen_ids: if entry['id'] in seen_ids:
continue continue
# 关键词匹配
if match_entry(entry, queries): if match_entry(entry, queries):
all_entries.append(entry) all_entries.append(entry)
seen_ids.add(entry['id']) seen_ids.add(entry['id'])
new_count += 1 new_count += 1
print(f"Match found: {entry['title'][:50]}...") print(f"Match found: {entry['title'][:50]}...")
print(f"Added {new_count} new entries. Total entries before limit: {len(all_entries)}") print(f"Added {new_count} new entries.")
# 4. 生成新文件 (包含排序和截断)
generate_rss_xml(all_entries) generate_rss_xml(all_entries)
if __name__ == '__main__': if __name__ == '__main__':