Files
XijiaX/crawler.py
2025-07-15 10:52:00 +08:00

232 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from PIL import Image
from io import BytesIO
import feedparser
from aiohttp_retry import RetryClient, ExponentialRetry
from aiohttp import ClientTimeout
from config import Config
class RSSCrawler:
def __init__(self):
self.ua = UserAgent()
retry_options = ExponentialRetry(attempts=3, start_timeout=1, max_timeout=10)
timeout = ClientTimeout(total=30, connect=10)
self.session = RetryClient(retry_options=retry_options, timeout=timeout)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
print("已关闭aiohttp会话和连接器")
def parse_date(self, date_str):
# 输出原始文本内容
# print(f"原始文本内容:{date_str}")
# 移除HTML标签保留标签内的文本内容
text = re.sub(r'<[^>]+>', '', date_str)
#print(f"清理HTML标签后的文本{text}")
# 直接匹配年月日和可选的时分秒
pattern = r'([0-9]{4}年[0-9]{1,2}月[0-9]{1,2}日(?:[0-9]{1,2}时)?(?:[0-9]{1,2}分)?(?:[0-9]{1,2}秒)?)'
matches = re.finditer(pattern, text)
dates = [match.group(1) for match in matches]
if dates:
print(f"找到所有限免时间:{dates}")
# 返回最后一个匹配的时间,通常是截止时间
return dates[-1]
print("未找到限免时间信息")
return None
def parse_serial_key(self, content):
# 清理HTML标签和多余空白
text = re.sub(r'<[^>]+>', ' ', content) # 将HTML标签替换为空格而不是直接删除
text = re.sub(r'\s+', ' ', text).strip()
# 匹配常见的序列号和注册码格式:
# 1. 4-5位分组的序列号XXXX-XXXX-XXXX-XX
# 2. 16-32位的纯字母数字组合
# 3. 激活码格式
patterns = [
r'[A-Z0-9]{4,5}(?:-[A-Z0-9]{4,5}){2,5}', # 支持3-6组的激活码格式
r'[A-Z0-9]{4,5}(?:-[A-Z0-9]{4,5}){3,4}', # 分组式序列号
r'(?=.*[0-9])(?=.*[A-Z])[A-Z0-9]{16,32}' # 纯字母数字组合
]
for pattern in patterns:
matches = re.finditer(pattern, text)
keys = [match.group(0) for match in matches]
if keys:
print(f"找到序列号/注册码:{', '.join(keys)}")
return keys[0] # 返回第一个匹配的序列号
print("未找到序列号/注册码")
return None
def extract_download_urls(self, content):
soup = BeautifulSoup(content, 'html.parser')
urls = []
# 查找所有带href属性的a标签
for a in soup.find_all('a', href=True):
try:
url = a['href'].strip()
# 过滤掉空链接、反斗限免的内部链接和夸克网盘链接
if url and not url.startswith('#') and 'free.apprcn.com' not in url and 'pan.quark.cn' not in url:
# 检查链接是否为常见的下载平台或应用商店链接
urls.append(url)
print(f"找到下载链接:{url}")
except Exception as e:
print(f"处理链接时出错:{str(e)}")
continue
if not urls:
print("未找到任何下载链接")
return urls
async def download_image(self, url, title):
try:
if not os.path.exists(Config.IMAGE_SAVE_PATH):
os.makedirs(Config.IMAGE_SAVE_PATH)
print(f"创建图片保存目录:{Config.IMAGE_SAVE_PATH}")
# 使用标题生成安全的文件名,保留更多原始标题信息
safe_filename = re.sub(r'[<>:"\\|?*\[\]]', '', title) # 移除不安全的字符
safe_filename = safe_filename.strip() # 移除首尾空白
safe_filename = re.sub(r'\s+', '_', safe_filename) # 将空白字符替换为下划线
safe_filename = safe_filename[:100] # 限制文件名长度但保留更多信息
print(f"开始下载图片:{url}")
async with self.session.get(url, headers={'User-Agent': self.ua.random}) as response:
if response.status == 200:
data = await response.read()
img = Image.open(BytesIO(data))
# 确保图片格式正确处理
if img.format and img.format.lower() == 'webp':
img = img.convert('RGB')
elif img.mode in ['RGBA', 'LA']:
# 处理带透明通道的图片
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'RGBA':
background.paste(img, mask=img.split()[3])
else:
background.paste(img, mask=img.split()[1])
img = background
elif img.mode != 'RGB':
img = img.convert('RGB')
save_path = os.path.join(Config.IMAGE_SAVE_PATH, f"{safe_filename}.jpg")
img.save(save_path, 'JPEG', quality=95)
print(f"图片成功下载并保存到:{save_path}")
return save_path
else:
print(f"图片下载失败HTTP状态码{response.status}URL{url}")
except Exception as e:
print(f"图片下载过程发生错误:{str(e)}URL{url}")
return None
async def fetch_rss(self):
print("开始获取RSS订阅内容...")
items = []
try:
headers = {'User-Agent': self.ua.random}
async with self.session.get(Config.RSS_URL, headers=headers) as response:
if response.status != 200:
print(f"RSS获取失败HTTP状态码{response.status}")
return []
content = await response.text()
feed = feedparser.parse(content)
if not feed.entries:
print("RSS内容解析失败或内容为空")
return []
print(f"共获取到 {len(feed.entries)} 条内容,将处理前 {Config.RSS_LIMIT}")
# 只获取最新的指定数量条信息
for i, entry in enumerate(feed.entries[:Config.RSS_LIMIT], 1):
print(f"\n开始处理第 {i} 条内容:{entry.title}")
try:
# 获取完整的文章内容
content = ''
if hasattr(entry, 'content'):
content = entry.content[0].value
elif hasattr(entry, 'description'):
content = entry.description
soup = BeautifulSoup(content, 'html.parser')
# 查找所有img标签
all_img_tags = soup.find_all('img')
if not all_img_tags:
print(f"{i} 条内容没有找到图片标签,跳过处理")
continue
# 提取下载链接
download_urls = self.extract_download_urls(content)
if not download_urls:
print(f"{i} 条内容没有找到下载链接,跳过处理")
continue
# 处理图片
local_image = None
for img_tag in all_img_tags:
if 'src' not in img_tag.attrs:
continue
img_url = img_tag['src']
if not img_url.startswith('http'):
continue
local_image = await self.download_image(img_url, entry.title)
if local_image:
break
if not local_image:
print(f"{i} 条内容图片下载失败,跳过处理")
continue
# 解析限免时间和序列号
free_time = self.parse_date(content)
serial_key = self.parse_serial_key(content)
# 创建文章信息字典
item = {
'title': entry.title,
'pub_date': getattr(entry, 'published', '未知发布时间'),
'content': content,
'download_urls': download_urls,
'free_time': free_time,
'serial_key': serial_key,
'local_image': local_image,
'url': entry.link if hasattr(entry, 'link') else download_urls[0] # 添加url字段
}
print("\n文章信息获取成功:")
print(f"- 标题:{item['title']}")
print(f"- 发布时间:{item['pub_date']}")
print(f"- 限免时间:{item['free_time']}")
print(f"- 序列号:{item['serial_key']}")
print("- 下载链接:")
for url in item['download_urls']:
print(f" {url}")
print(f"- 本地图片路径:{item['local_image']}")
items.append(item)
except Exception as e:
print(f"处理第 {i} 条内容时出错:{str(e)}")
continue
except Exception as e:
print(f"RSS获取过程中发生错误{str(e)}")
finally:
# 确保在函数结束时关闭会话
await self.session.close()
print("已关闭aiohttp会话")
print(f"\n成功处理 {len(items)} 条内容")
return items