Files
paper-feed/get_RSS.py
2026-01-01 19:22:44 +08:00

198 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import feedparser
import re
import os
import datetime
import time
from rfeed import Item, Feed, Guid
from email.utils import parsedate_to_datetime
# --- 配置区域 ---
OUTPUT_FILE = "filtered_feed.xml"
MAX_ITEMS = 1000
# ----------------
def load_config(filename, env_var_name=None):
"""(保持你之前的 load_config 代码不变)"""
# ... 请保留你之前为了隐私修改过的 load_config 函数 ...
# 这里为了篇幅省略,请直接复用你现在的 load_config
if env_var_name and os.environ.get(env_var_name):
print(f"Loading config from environment variable: {env_var_name}")
content = os.environ[env_var_name]
if '\n' in content:
return [line.strip() for line in content.split('\n') if line.strip()]
else:
return [line.strip() for line in content.split(';') if line.strip()]
if os.path.exists(filename):
print(f"Loading config from local file: {filename}")
with open(filename, 'r', encoding='utf-8') as f:
return [line.strip() for line in f if line.strip() and not line.startswith('#')]
return []
# --- 新增XML 非法字符清洗函数 ---
def remove_illegal_xml_chars(text):
"""
移除 XML 1.0 不支持的 ASCII 控制字符 (Char value 0-8, 11-12, 14-31)
"""
if not text:
return ""
# 正则表达式:匹配 ASCII 0-8, 11, 12, 14-31 这些控制字符
# \x09是tab, \x0a是换行, \x0d是回车这些是合法的所以不删
illegal_chars = r'[\x00-\x08\x0b\x0c\x0e-\x1f]'
return re.sub(illegal_chars, '', text)
def convert_struct_time_to_datetime(struct_time):
if not struct_time:
return datetime.datetime.now()
return datetime.datetime.fromtimestamp(time.mktime(struct_time))
def parse_rss(rss_url, retries=3):
# (保持不变)
print(f"Fetching: {rss_url}...")
for attempt in range(retries):
try:
feed = feedparser.parse(rss_url)
entries = []
journal_title = feed.feed.get('title', 'Unknown Journal')
for entry in feed.entries:
pub_struct = entry.get('published_parsed', entry.get('updated_parsed'))
pub_date = convert_struct_time_to_datetime(pub_struct)
entries.append({
'title': entry.get('title', ''),
'link': entry.get('link', ''),
'pub_date': pub_date,
'summary': entry.get('summary', entry.get('description', '')),
'journal': journal_title,
'id': entry.get('id', entry.get('link', ''))
})
return entries
except Exception as e:
print(f"Error parsing {rss_url}: {e}")
time.sleep(2)
return []
def get_existing_items():
# (保持不变,但增加容错:如果 XML 坏了,就返回空列表重新抓)
if not os.path.exists(OUTPUT_FILE):
return []
print(f"Loading existing items from {OUTPUT_FILE}...")
try:
feed = feedparser.parse(OUTPUT_FILE)
# 如果解析出错(比如现在的 invalid charfeedparser 可能会拿到空或者 bozo 标志
if hasattr(feed, 'bozo') and feed.bozo == 1:
print("Warning: Existing XML file might be corrupted. Ignoring old items.")
# 这里可以选择 return [] 直接丢弃坏掉的旧数据,重新开始
# return []
# 或者尝试读取能读的部分(取决于损坏位置)
entries = []
for entry in feed.entries:
pub_struct = entry.get('published_parsed')
pub_date = convert_struct_time_to_datetime(pub_struct)
entries.append({
'title': entry.get('title', ''),
'link': entry.get('link', ''),
'pub_date': pub_date,
'summary': entry.get('summary', ''),
'journal': entry.get('author', ''),
'id': entry.get('id', entry.get('link', '')),
'is_old': True
})
return entries
except Exception as e:
print(f"Error reading existing file: {e}")
return [] # 如果旧文件读不了,就当做第一次运行
def match_entry(entry, queries):
# (保持不变)
text_to_search = (entry['title'] + " " + entry['summary']).lower()
for query in queries:
keywords = [k.strip().lower() for k in query.split('AND')]
match = True
for keyword in keywords:
if keyword not in text_to_search:
match = False
break
if match:
return True
return False
def generate_rss_xml(items):
"""生成 RSS 2.0 XML 文件 (已加入非法字符清洗)"""
rss_items = []
items.sort(key=lambda x: x['pub_date'], reverse=True)
items = items[:MAX_ITEMS]
for item in items:
title = item['title']
if not item.get('is_old', False):
title = f"[{item['journal']}] {item['title']}"
# --- 关键修改:清洗数据 ---
clean_title = remove_illegal_xml_chars(title)
clean_summary = remove_illegal_xml_chars(item['summary'])
clean_journal = remove_illegal_xml_chars(item['journal'])
# -----------------------
rss_item = Item(
title = clean_title,
link = item['link'],
description = clean_summary,
author = clean_journal,
guid = Guid(item['id']),
pubDate = item['pub_date']
)
rss_items.append(rss_item)
feed = Feed(
title = "My Customized Papers",
link = "https://github.com/your_username/your_repo",
description = "Aggregated research papers",
language = "en-US",
lastBuildDate = datetime.datetime.now(),
items = rss_items
)
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
f.write(feed.rss())
print(f"Successfully generated {OUTPUT_FILE} with {len(rss_items)} items.")
def main():
# 请确保这里的调用参数与你目前的 secrets 配置一致
rss_urls = load_config('journals.dat', 'RSS_JOURNALS')
queries = load_config('keywords.dat', 'RSS_KEYWORDS')
if not rss_urls or not queries:
print("Error: Configuration files are empty or missing.")
return
existing_entries = get_existing_items()
seen_ids = set(entry['id'] for entry in existing_entries)
all_entries = existing_entries.copy()
new_count = 0
print("Starting RSS fetch from remote...")
for url in rss_urls:
fetched_entries = parse_rss(url)
for entry in fetched_entries:
if entry['id'] in seen_ids:
continue
if match_entry(entry, queries):
all_entries.append(entry)
seen_ids.add(entry['id'])
new_count += 1
print(f"Match found: {entry['title'][:50]}...")
print(f"Added {new_count} new entries.")
generate_rss_xml(all_entries)
if __name__ == '__main__':
main()