diff --git a/get_RSS.py b/get_RSS.py index b0d2231..85175ee 100644 --- a/get_RSS.py +++ b/get_RSS.py @@ -7,45 +7,48 @@ from rfeed import Item, Feed, Guid from email.utils import parsedate_to_datetime # --- 配置区域 --- -OUTPUT_FILE = "filtered_feed.xml" # 输出文件 -MAX_ITEMS = 10000 # RSS中保留的最大条目数(滚动窗口) -JOURNALS_FILE = 'journals.dat' -KEYWORDS_FILE = 'keywords.dat' +OUTPUT_FILE = "filtered_feed.xml" +MAX_ITEMS = 1000 # ---------------- def load_config(filename, env_var_name=None): - """ - 优先从环境变量读取配置(用于 GitHub Actions 保护隐私), - 如果环境变量不存在,则读取本地文件(用于本地测试)。 - """ - # 1. 尝试读取环境变量 (Secrets) + """(保持你之前的 load_config 代码不变)""" + # ... 请保留你之前为了隐私修改过的 load_config 函数 ... + # 这里为了篇幅省略,请直接复用你现在的 load_config if env_var_name and os.environ.get(env_var_name): print(f"Loading config from environment variable: {env_var_name}") - # 假设环境变量里用分号 ; 或者换行符分隔 content = os.environ[env_var_name] - # 兼容换行符或分号分隔 if '\n' in content: return [line.strip() for line in content.split('\n') if line.strip()] else: return [line.strip() for line in content.split(';') if line.strip()] - # 2. 尝试读取本地文件 if os.path.exists(filename): print(f"Loading config from local file: {filename}") with open(filename, 'r', encoding='utf-8') as f: return [line.strip() for line in f if line.strip() and not line.startswith('#')] - print(f"Warning: No config found for {filename} or {env_var_name}") return [] +# --- 新增:XML 非法字符清洗函数 --- +def remove_illegal_xml_chars(text): + """ + 移除 XML 1.0 不支持的 ASCII 控制字符 (Char value 0-8, 11-12, 14-31) + """ + if not text: + return "" + # 正则表达式:匹配 ASCII 0-8, 11, 12, 14-31 这些控制字符 + # \x09是tab, \x0a是换行, \x0d是回车,这些是合法的,所以不删 + illegal_chars = r'[\x00-\x08\x0b\x0c\x0e-\x1f]' + return re.sub(illegal_chars, '', text) + def convert_struct_time_to_datetime(struct_time): - """将 feedparser 的时间结构转换为 datetime 对象""" if not struct_time: return datetime.datetime.now() return datetime.datetime.fromtimestamp(time.mktime(struct_time)) def parse_rss(rss_url, retries=3): - """解析在线 RSS 订阅""" + # (保持不变) print(f"Fetching: {rss_url}...") for attempt in range(retries): try: @@ -54,7 +57,6 @@ def parse_rss(rss_url, retries=3): journal_title = feed.feed.get('title', 'Unknown Journal') for entry in feed.entries: - # 获取标准时间 pub_struct = entry.get('published_parsed', entry.get('updated_parsed')) pub_date = convert_struct_time_to_datetime(pub_struct) @@ -64,7 +66,7 @@ def parse_rss(rss_url, retries=3): 'pub_date': pub_date, 'summary': entry.get('summary', entry.get('description', '')), 'journal': journal_title, - 'id': entry.get('id', entry.get('link', '')) # ID 用于去重 + 'id': entry.get('id', entry.get('link', '')) }) return entries except Exception as e: @@ -73,46 +75,46 @@ def parse_rss(rss_url, retries=3): return [] def get_existing_items(): - """读取上一次生成的 XML 文件,保留历史数据""" + # (保持不变,但增加容错:如果 XML 坏了,就返回空列表重新抓) if not os.path.exists(OUTPUT_FILE): return [] print(f"Loading existing items from {OUTPUT_FILE}...") try: - # feedparser 也可以解析本地 XML 文件 feed = feedparser.parse(OUTPUT_FILE) + # 如果解析出错(比如现在的 invalid char),feedparser 可能会拿到空或者 bozo 标志 + if hasattr(feed, 'bozo') and feed.bozo == 1: + print("Warning: Existing XML file might be corrupted. Ignoring old items.") + # 这里可以选择 return [] 直接丢弃坏掉的旧数据,重新开始 + # return [] + # 或者尝试读取能读的部分(取决于损坏位置) + entries = [] for entry in feed.entries: - # 恢复 datetime 对象 pub_struct = entry.get('published_parsed') pub_date = convert_struct_time_to_datetime(pub_struct) - # 注意:生成的 XML 标题通常是 "[Journal] Title",这里我们需要尽量保持原样 - # 或者为了简单起见,我们直接存储读取到的内容 entries.append({ - 'title': entry.get('title', ''), # 这里标题已经包含 [Journal] 前缀了 + 'title': entry.get('title', ''), 'link': entry.get('link', ''), 'pub_date': pub_date, 'summary': entry.get('summary', ''), - 'journal': entry.get('author', ''), # 我们在生成时把 journal 存入了 author 字段 + 'journal': entry.get('author', ''), 'id': entry.get('id', entry.get('link', '')), - 'is_old': True # 标记为旧数据,不需要再次关键词匹配 + 'is_old': True }) return entries except Exception as e: print(f"Error reading existing file: {e}") - return [] + return [] # 如果旧文件读不了,就当做第一次运行 def match_entry(entry, queries): - """关键词匹配""" - # 构造待搜索文本 + # (保持不变) text_to_search = (entry['title'] + " " + entry['summary']).lower() - for query in queries: keywords = [k.strip().lower() for k in query.split('AND')] match = True for keyword in keywords: - # 使用简单的字符串包含判断,比正则更快,且对科研关键词通常足够 if keyword not in text_to_search: match = False break @@ -121,37 +123,37 @@ def match_entry(entry, queries): return False def generate_rss_xml(items): - """生成 RSS 2.0 XML 文件""" + """生成 RSS 2.0 XML 文件 (已加入非法字符清洗)""" rss_items = [] - # 按时间倒序排列(最新的在最前) - # 确保所有 item 都有 pub_date 且是 datetime 对象 items.sort(key=lambda x: x['pub_date'], reverse=True) - - # 截取最新的 MAX_ITEMS 条 items = items[:MAX_ITEMS] for item in items: - # 如果是旧数据,标题可能已经是 "[Journal] Title" 格式,需要避免重复添加前缀 title = item['title'] if not item.get('is_old', False): - # 新数据,添加期刊前缀 title = f"[{item['journal']}] {item['title']}" + # --- 关键修改:清洗数据 --- + clean_title = remove_illegal_xml_chars(title) + clean_summary = remove_illegal_xml_chars(item['summary']) + clean_journal = remove_illegal_xml_chars(item['journal']) + # ----------------------- + rss_item = Item( - title = title, + title = clean_title, link = item['link'], - description = item['summary'], - author = item['journal'], # 借用 author 字段存储期刊名 + description = clean_summary, + author = clean_journal, guid = Guid(item['id']), pubDate = item['pub_date'] ) rss_items.append(rss_item) feed = Feed( - title = "My Customized Papers (Auto-Filtered)", + title = "My Customized Papers", link = "https://github.com/your_username/your_repo", - description = "Aggregated research papers based on keywords", + description = "Aggregated research papers", language = "en-US", lastBuildDate = datetime.datetime.now(), items = rss_items @@ -162,41 +164,34 @@ def generate_rss_xml(items): print(f"Successfully generated {OUTPUT_FILE} with {len(rss_items)} items.") def main(): - # 1. 读取配置 - rss_urls = load_config('journals.dat', 'RSS_JOURNALS') + # 请确保这里的调用参数与你目前的 secrets 配置一致 + rss_urls = load_config('journals.dat', 'RSS_JOURNALS') queries = load_config('keywords.dat', 'RSS_KEYWORDS') if not rss_urls or not queries: print("Error: Configuration files are empty or missing.") return - # 2. 读取旧数据(核心去重策略:保留历史) existing_entries = get_existing_items() - # 创建一个已有 ID 的集合,用于快速查重 seen_ids = set(entry['id'] for entry in existing_entries) all_entries = existing_entries.copy() new_count = 0 - # 3. 抓取新数据 print("Starting RSS fetch from remote...") for url in rss_urls: fetched_entries = parse_rss(url) for entry in fetched_entries: - # 查重:如果 ID 已经在旧数据里,直接跳过 if entry['id'] in seen_ids: continue - # 关键词匹配 if match_entry(entry, queries): all_entries.append(entry) seen_ids.add(entry['id']) new_count += 1 print(f"Match found: {entry['title'][:50]}...") - print(f"Added {new_count} new entries. Total entries before limit: {len(all_entries)}") - - # 4. 生成新文件 (包含排序和截断) + print(f"Added {new_count} new entries.") generate_rss_xml(all_entries) if __name__ == '__main__':