代码发布

This commit is contained in:
fungj
2025-07-15 10:52:00 +08:00
commit af8cf40331
15 changed files with 835 additions and 0 deletions

22
.gitignore vendored Normal file
View File

@@ -0,0 +1,22 @@
# Python
__pycache__/
*.py[cod]
*$py.class
# Database
articles.db
# Images
images/
# Environment
.env
# IDE
.vscode/
.idea/
# Virtual environment
.venv/
venv/
ENV/

22
Pipfile Normal file
View File

@@ -0,0 +1,22 @@
[[source]]
url = "https://mirrors.aliyun.com/pypi/simple"
verify_ssl = true
name = "pip_conf_index_global"
[packages]
python-dateutil = "*"
requests = "*"
aiohttp-retry = "*"
fake-useragent = "*"
feedparser = "*"
beautifulsoup4 = "*"
aiofiles = "*"
pillow = "*"
aiohttp = "*"
openai = "*"
python-wordpress-xmlrpc = "*"
[dev-packages]
[requires]
python_version = "3.13"

94
README.md Normal file
View File

@@ -0,0 +1,94 @@
# 喜加X自动发布系统
## 项目简介
这是我之前开发的一个脚本项目本项目是一个自动化工具用于获取软件限免信息并自动发布到WordPress网站。系统会自动抓取限免软件信息使用AI进行内容分析和优化然后将处理后的内容发布到指定的WordPress站点。
WordPress XML-RPC的配置和设置方法请自行查询并设置。
## 系统功能
### 1. 内容抓取
- 自动获取软件限免信息
- 支持多个来源的内容聚合
- 自动下载和保存相关图片
### 2. AI内容分析
- 使用AI技术分析和优化文章内容
- 自动生成优化后的文章标题和描述
- 智能分类内容类型
### 3. WordPress发布
- 自动发布文章到WordPress
- 支持图片上传
- 自动设置文章分类
- 支持限免时间和序列号信息的格式化展示
## 配置说明
### 1. 基础配置
`config.py`文件中配置以下信息:
```python
# WordPress配置
WORDPRESS_URL = "你的WordPress XML-RPC地址"
WORDPRESS_USERNAME = "WordPress用户名"
WORDPRESS_PASSWORD = "WordPress密码"
# AI配置
AI_API_KEY = "你的AI API密钥"
```
### 2. 数据库配置
系统使用SQLite数据库存储文章信息数据库文件为`articles.db`,无需额外配置。
## 部署说明
### 1. 环境要求
- Python 3.x
- pipenv推荐使用虚拟环境
### 2. 安装依赖
```bash
pipenv install
```
### 3. 运行系统
```bash
pipenv run python main.py
```
## 目录结构
```
├── main.py # 主程序入口
├── config.py # 配置文件
├── crawler.py # 内容抓取模块
├── ai_analyzer.py # AI内容分析模块
├── wp_publisher.py # WordPress发布模块
├── db_manager.py # 数据库管理模块
├── articles.db # SQLite数据库文件
├── images/ # 图片存储目录
├── Pipfile # 依赖管理文件
└── Pipfile.lock # 依赖版本锁定文件
```
## 注意事项
1. 首次运行前请确保已正确配置所有必要的API密钥和WordPress访问信息
2. 建议定期备份`articles.db`数据库文件
3. 图片文件会自动保存在`images`目录下
4. 系统会自动处理重复内容,避免重复发布
## 功能扩展
系统设计采用模块化结构,可以方便地扩展新功能:
1. 添加新的内容源:扩展`crawler.py`
2. 自定义AI处理逻辑修改`ai_analyzer.py`
3. 调整发布格式:修改`wp_publisher.py`
## 许可证
MIT License

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

132
ai_analyzer.py Normal file
View File

@@ -0,0 +1,132 @@
import os
import json
import aiohttp
import asyncio
import re
from config import Config
from db_manager import DatabaseManager
class AIContentAnalyzer:
def __init__(self):
self.api_key = Config.DEEPSEEK_API_KEY
self.api_url = Config.DEEPSEEK_API_URL
self.db_manager = DatabaseManager()
async def analyze_content(self, title, url=None):
print(f"\n开始AI分析内容{title}")
prompt = f"""请完成以下任务:
1. 分析标题:{title}
判断这是游戏、软件还是APP。如果是APP请进一步判断是Android应用还是APP游戏。
请严格按照以下类型返回:
- 如果是手机游戏,返回"APP游戏"
- 如果是Android应用返回"Android应用"
- 如果是苹果IOS返回"IOS"
- 如果是电脑游戏,返回"游戏"
- 如果是电脑软件,返回"软件"
2. 优化标题格式:
如果是游戏,改写为'喜加一[如steamepic]游戏 游戏名称'的格式。
如果是字体软件,改写为'字体名称+字体类型'的格式。
其他软件,改写为'软件名称+软件类型(如视频编辑器软件、修图软件等)'的格式。
3. 生成软件评测:
请对这个软件或游戏进行一个专业的评测和介绍字数限制在500个字符内用一段话介绍即可。请按以下格式返回
类型:[软件/游戏/APP类型]
标题:[格式化后的标题]
评测:[软件评测内容]"""
print("正在调用Deepseek AI进行内容分析...")
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
data = {
"model": "deepseek-chat",
"messages": [
{
"role": "system",
"content": "你是一位限时免费游戏和软件资源分享博客的博主。"
},
{
"role": "user",
"content": prompt
}
],
"temperature": 0,
"max_tokens": 1000
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(self.api_url, headers=headers, json=data) as response:
if response.status == 200:
result = await response.json()
content = result['choices'][0]['message']['content']
print("Deepseek AI响应成功正在解析结果...")
else:
error_text = await response.text()
raise Exception(f"API调用失败{error_text}")
print(f"AI响应内容{content}")
# 优化解析逻辑,支持多种格式
lines = [line.strip() for line in content.split('\n') if line.strip()]
result = {}
# 初始化默认值
result['type'] = '软件'
result['category_id'] = 2
result['parent_category_id'] = None
result['title'] = title
result['review'] = ''
for line in lines:
# 移除序号前缀(如果存在)
line = re.sub(r'^\d+.\s*', '', line.strip())
if line.lower().startswith('类型:') or line.lower().startswith('类型:'):
content_type = line.split('')[-1].split(':')[-1].strip('[]')
# 根据AI返回的类型确定WordPress分类ID
if 'ios' in content_type.lower():
result['type'] = 'IOS'
result['category_id'] = 434
result['parent_category_id'] = 3
elif 'app游戏' in content_type.lower():
result['type'] = '手机游戏'
result['category_id'] = 56
result['parent_category_id'] = 3
elif 'android' in content_type.lower():
result['type'] = 'Android应用'
result['category_id'] = 57
result['parent_category_id'] = 3
elif '游戏' in content_type.lower():
result['type'] = '游戏'
result['category_id'] = 1
else:
result['type'] = '软件'
result['category_id'] = 2
elif line.lower().startswith('标题:') or line.lower().startswith('标题:'):
result['title'] = line.split('')[-1].split(':')[-1].strip('[]')
elif line.lower().startswith('评测:') or line.lower().startswith('评测:'):
result['review'] = line.split('')[-1].split(':')[-1].strip('[]')
print("\nAI分析结果")
print(f"- 内容类型:{result['type']} (分类ID: {result['category_id']})")
print(f"- 优化标题:{result['title']}")
print(f"- 评测内容:{result['review']}")
if url:
await self.db_manager.add_processed_article(
url=url,
title=title,
content_type=result['type'],
optimized_title=result['title']
)
return result
except Exception as e:
error_msg = f"AI分析失败: {str(e)}"
print(f"\n错误:{error_msg}")
raise Exception(error_msg)

BIN
articles.db Normal file

Binary file not shown.

17
config.py Normal file
View File

@@ -0,0 +1,17 @@
# 配置信息
class Config:
# RSS配置
RSS_URL = "https://free.apprcn.com/feed/"
RSS_LIMIT = 3 # 限制获取最新的3条信息
# WordPress配置
WORDPRESS_URL = "https://你的站点/xmlrpc.php"
WORDPRESS_USERNAME = "用户名"
WORDPRESS_PASSWORD = "密码"
# Deepseek AI配置
DEEPSEEK_API_KEY = "Deepseek key"
DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions" # Deepseek API地址
# 图片配置
IMAGE_SAVE_PATH = "images" # 图片保存路径

232
crawler.py Normal file
View File

@@ -0,0 +1,232 @@
import os
import re
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from PIL import Image
from io import BytesIO
import feedparser
from aiohttp_retry import RetryClient, ExponentialRetry
from aiohttp import ClientTimeout
from config import Config
class RSSCrawler:
def __init__(self):
self.ua = UserAgent()
retry_options = ExponentialRetry(attempts=3, start_timeout=1, max_timeout=10)
timeout = ClientTimeout(total=30, connect=10)
self.session = RetryClient(retry_options=retry_options, timeout=timeout)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
print("已关闭aiohttp会话和连接器")
def parse_date(self, date_str):
# 输出原始文本内容
# print(f"原始文本内容:{date_str}")
# 移除HTML标签保留标签内的文本内容
text = re.sub(r'<[^>]+>', '', date_str)
#print(f"清理HTML标签后的文本{text}")
# 直接匹配年月日和可选的时分秒
pattern = r'([0-9]{4}年[0-9]{1,2}月[0-9]{1,2}日(?:[0-9]{1,2}时)?(?:[0-9]{1,2}分)?(?:[0-9]{1,2}秒)?)'
matches = re.finditer(pattern, text)
dates = [match.group(1) for match in matches]
if dates:
print(f"找到所有限免时间:{dates}")
# 返回最后一个匹配的时间,通常是截止时间
return dates[-1]
print("未找到限免时间信息")
return None
def parse_serial_key(self, content):
# 清理HTML标签和多余空白
text = re.sub(r'<[^>]+>', ' ', content) # 将HTML标签替换为空格而不是直接删除
text = re.sub(r'\s+', ' ', text).strip()
# 匹配常见的序列号和注册码格式:
# 1. 4-5位分组的序列号XXXX-XXXX-XXXX-XX
# 2. 16-32位的纯字母数字组合
# 3. 激活码格式
patterns = [
r'[A-Z0-9]{4,5}(?:-[A-Z0-9]{4,5}){2,5}', # 支持3-6组的激活码格式
r'[A-Z0-9]{4,5}(?:-[A-Z0-9]{4,5}){3,4}', # 分组式序列号
r'(?=.*[0-9])(?=.*[A-Z])[A-Z0-9]{16,32}' # 纯字母数字组合
]
for pattern in patterns:
matches = re.finditer(pattern, text)
keys = [match.group(0) for match in matches]
if keys:
print(f"找到序列号/注册码:{', '.join(keys)}")
return keys[0] # 返回第一个匹配的序列号
print("未找到序列号/注册码")
return None
def extract_download_urls(self, content):
soup = BeautifulSoup(content, 'html.parser')
urls = []
# 查找所有带href属性的a标签
for a in soup.find_all('a', href=True):
try:
url = a['href'].strip()
# 过滤掉空链接、反斗限免的内部链接和夸克网盘链接
if url and not url.startswith('#') and 'free.apprcn.com' not in url and 'pan.quark.cn' not in url:
# 检查链接是否为常见的下载平台或应用商店链接
urls.append(url)
print(f"找到下载链接:{url}")
except Exception as e:
print(f"处理链接时出错:{str(e)}")
continue
if not urls:
print("未找到任何下载链接")
return urls
async def download_image(self, url, title):
try:
if not os.path.exists(Config.IMAGE_SAVE_PATH):
os.makedirs(Config.IMAGE_SAVE_PATH)
print(f"创建图片保存目录:{Config.IMAGE_SAVE_PATH}")
# 使用标题生成安全的文件名,保留更多原始标题信息
safe_filename = re.sub(r'[<>:"\\|?*\[\]]', '', title) # 移除不安全的字符
safe_filename = safe_filename.strip() # 移除首尾空白
safe_filename = re.sub(r'\s+', '_', safe_filename) # 将空白字符替换为下划线
safe_filename = safe_filename[:100] # 限制文件名长度但保留更多信息
print(f"开始下载图片:{url}")
async with self.session.get(url, headers={'User-Agent': self.ua.random}) as response:
if response.status == 200:
data = await response.read()
img = Image.open(BytesIO(data))
# 确保图片格式正确处理
if img.format and img.format.lower() == 'webp':
img = img.convert('RGB')
elif img.mode in ['RGBA', 'LA']:
# 处理带透明通道的图片
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'RGBA':
background.paste(img, mask=img.split()[3])
else:
background.paste(img, mask=img.split()[1])
img = background
elif img.mode != 'RGB':
img = img.convert('RGB')
save_path = os.path.join(Config.IMAGE_SAVE_PATH, f"{safe_filename}.jpg")
img.save(save_path, 'JPEG', quality=95)
print(f"图片成功下载并保存到:{save_path}")
return save_path
else:
print(f"图片下载失败HTTP状态码{response.status}URL{url}")
except Exception as e:
print(f"图片下载过程发生错误:{str(e)}URL{url}")
return None
async def fetch_rss(self):
print("开始获取RSS订阅内容...")
items = []
try:
headers = {'User-Agent': self.ua.random}
async with self.session.get(Config.RSS_URL, headers=headers) as response:
if response.status != 200:
print(f"RSS获取失败HTTP状态码{response.status}")
return []
content = await response.text()
feed = feedparser.parse(content)
if not feed.entries:
print("RSS内容解析失败或内容为空")
return []
print(f"共获取到 {len(feed.entries)} 条内容,将处理前 {Config.RSS_LIMIT}")
# 只获取最新的指定数量条信息
for i, entry in enumerate(feed.entries[:Config.RSS_LIMIT], 1):
print(f"\n开始处理第 {i} 条内容:{entry.title}")
try:
# 获取完整的文章内容
content = ''
if hasattr(entry, 'content'):
content = entry.content[0].value
elif hasattr(entry, 'description'):
content = entry.description
soup = BeautifulSoup(content, 'html.parser')
# 查找所有img标签
all_img_tags = soup.find_all('img')
if not all_img_tags:
print(f"{i} 条内容没有找到图片标签,跳过处理")
continue
# 提取下载链接
download_urls = self.extract_download_urls(content)
if not download_urls:
print(f"{i} 条内容没有找到下载链接,跳过处理")
continue
# 处理图片
local_image = None
for img_tag in all_img_tags:
if 'src' not in img_tag.attrs:
continue
img_url = img_tag['src']
if not img_url.startswith('http'):
continue
local_image = await self.download_image(img_url, entry.title)
if local_image:
break
if not local_image:
print(f"{i} 条内容图片下载失败,跳过处理")
continue
# 解析限免时间和序列号
free_time = self.parse_date(content)
serial_key = self.parse_serial_key(content)
# 创建文章信息字典
item = {
'title': entry.title,
'pub_date': getattr(entry, 'published', '未知发布时间'),
'content': content,
'download_urls': download_urls,
'free_time': free_time,
'serial_key': serial_key,
'local_image': local_image,
'url': entry.link if hasattr(entry, 'link') else download_urls[0] # 添加url字段
}
print("\n文章信息获取成功:")
print(f"- 标题:{item['title']}")
print(f"- 发布时间:{item['pub_date']}")
print(f"- 限免时间:{item['free_time']}")
print(f"- 序列号:{item['serial_key']}")
print("- 下载链接:")
for url in item['download_urls']:
print(f" {url}")
print(f"- 本地图片路径:{item['local_image']}")
items.append(item)
except Exception as e:
print(f"处理第 {i} 条内容时出错:{str(e)}")
continue
except Exception as e:
print(f"RSS获取过程中发生错误{str(e)}")
finally:
# 确保在函数结束时关闭会话
await self.session.close()
print("已关闭aiohttp会话")
print(f"\n成功处理 {len(items)} 条内容")
return items

117
db_manager.py Normal file
View File

@@ -0,0 +1,117 @@
import sqlite3
import os
from datetime import datetime, timedelta
class DatabaseManager:
def __init__(self, db_path='articles.db'):
self.db_path = db_path
self._init_db()
def _init_db(self):
"""初始化数据库,创建必要的表"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS processed_articles (
url TEXT PRIMARY KEY,
title TEXT,
processed_time TIMESTAMP,
content_type TEXT,
optimized_title TEXT,
published BOOLEAN DEFAULT 0
)
""")
conn.commit()
async def is_article_processed(self, url):
"""检查文章是否已经处理过"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT url FROM processed_articles WHERE url = ?",
(url,)
)
return cursor.fetchone() is not None
async def add_processed_article(self, url, title, content_type=None, optimized_title=None):
"""添加已处理的文章记录,但未发布状态"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"""INSERT INTO processed_articles
(url, title, processed_time, content_type, optimized_title, published)
VALUES (?, ?, ?, ?, ?, ?)""",
(url, title, datetime.now(), content_type, optimized_title, False)
)
conn.commit()
async def mark_article_published(self, url):
"""将文章标记为已发布状态"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"UPDATE processed_articles SET published = ? WHERE url = ?",
(True, url)
)
conn.commit()
async def cleanup_old_records(self, days=7):
"""清理指定天数之前的记录"""
cleanup_date = datetime.now() - timedelta(days=days)
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"DELETE FROM processed_articles WHERE processed_time < ?",
(cleanup_date,)
)
conn.commit()
print(f"已清理 {days} 天前的数据库记录")
async def get_recent_articles(self, limit=10):
"""获取最近处理的文章列表"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"""SELECT url, title, processed_time, content_type, optimized_title
FROM processed_articles
ORDER BY processed_time DESC
LIMIT ?""",
(limit,)
)
return cursor.fetchall()
async def cleanup_article_images(self, url, image_dir='images'):
"""清理已处理文章的图片"""
try:
# 检查文章是否存在于数据库中
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT 1 FROM processed_articles WHERE url = ?",
(url,)
)
result = cursor.fetchone()
if result:
# 文章存在于数据库中,清理相关图片
article_hash = str(hash(url))
image_path = os.path.join(image_dir, f"{article_hash}*")
# 使用glob模块查找匹配的文件
import glob
matching_files = glob.glob(image_path)
# 删除找到的所有匹配文件
for file_path in matching_files:
try:
if os.path.exists(file_path):
os.remove(file_path)
print(f"已删除图片:{file_path}")
except Exception as e:
print(f"删除图片文件时出错:{file_path} - {str(e)}")
if not matching_files:
print(f"未找到与URL对应的图片文件{url}")
except Exception as e:
print(f"清理图片时出错:{str(e)}")

94
main.py Normal file
View File

@@ -0,0 +1,94 @@
import asyncio
from crawler import RSSCrawler
from ai_analyzer import AIContentAnalyzer
from wp_publisher import WordPressPublisher
from db_manager import DatabaseManager
import os
import random
from datetime import datetime, timedelta
async def main():
print("=== 喜加一自动发布系统启动 ===")
while True:
try:
await process_items()
# 清理7天前的数据库记录
db_manager = DatabaseManager()
await db_manager.cleanup_old_records()
# 生成3-8分钟的随机等待时间
wait_minutes = random.uniform(3, 8)
wait_seconds = wait_minutes * 60
next_run_time = (datetime.now() + timedelta(minutes=wait_minutes)).strftime('%H:%M:%S')
print(f"本轮处理完成,将在 {wait_minutes:.2f} 分钟后({next_run_time})开始下一轮采集")
print(f"当前时间:{datetime.now().strftime('%H:%M:%S')}")
# 等待指定时间
await asyncio.sleep(wait_seconds)
print(f"等待结束,开始新一轮采集,当前时间:{datetime.now().strftime('%H:%M:%S')}")
except Exception as e:
print(f"系统运行出错:{str(e)}")
print(f"错误类型:{type(e).__name__}")
# 发生错误后等待1分钟再继续
await asyncio.sleep(60)
continue
async def process_items():
print("\n=== 开始新一轮内容处理 ===")
try:
# 初始化组件
crawler = RSSCrawler()
analyzer = AIContentAnalyzer()
publisher = WordPressPublisher()
db_manager = DatabaseManager()
# 获取RSS内容
print("\n1. 开始获取RSS内容...")
items = await crawler.fetch_rss()
if not items:
print("没有获取到新的内容,本轮处理结束")
return
# 处理每个内容项
for item in items:
try:
# 检查文章是否已处理
if await db_manager.is_article_processed(item['url']):
print(f"文章已处理过,跳过:{item['title']}")
continue
# AI分析内容
print("\n2. 开始AI内容分析...")
ai_content = await analyzer.analyze_content(item['title'])
# 添加数据库管理器到item
item['db_manager'] = db_manager
# 记录文章处理状态
await db_manager.add_processed_article(
item['url'],
item['title'],
ai_content['type'],
ai_content['title']
)
# 发布到WordPress
print("\n3. 开始发布到WordPress...")
await publisher.publish_post(item, ai_content)
print("\n=== 内容处理完成 ===")
except Exception as e:
print(f"处理内容项时出错:{str(e)}")
continue
except Exception as e:
print(f"处理过程出错:{str(e)}")
raise e
if __name__ == "__main__":
asyncio.run(main())

105
wp_publisher.py Normal file
View File

@@ -0,0 +1,105 @@
from wordpress_xmlrpc import Client, WordPressPost
from wordpress_xmlrpc.methods.posts import NewPost
from wordpress_xmlrpc.methods import media
from wordpress_xmlrpc.compat import xmlrpc_client
from config import Config
import os
class WordPressPublisher:
def __init__(self):
self.client = Client(
Config.WORDPRESS_URL,
Config.WORDPRESS_USERNAME,
Config.WORDPRESS_PASSWORD
)
async def publish_post(self, item, ai_content):
print(f"\n开始发布文章:{ai_content['title']}")
post = WordPressPost()
post.title = ai_content['title']
# 构建文章内容
content = []
if item.get('local_image'):
print("正在上传文章配图...")
# 上传图片到WordPress
with open(item['local_image'], 'rb') as img:
data = {}
data['name'] = os.path.basename(item['local_image'])
data['type'] = 'image/jpeg'
data['bits'] = xmlrpc_client.Binary(img.read())
response = self.client.call(media.UploadFile(data))
image_url = response['url']
print(f"图片上传成功:{image_url}")
content.append(f'<img src="{image_url}" alt="{ai_content["title"]}" />')
print("正在构建文章内容...")
content.append(f'<p>{ai_content["review"]}</p>')
if item['free_time']:
content.append(f'<hr /> <p><strong>限免时间:</strong><span style="color: #339966;">{item["free_time"]}</span></p>')
if item['serial_key']:
content.append(f'<hr /> <p><strong>序列号/注册码:</strong><span style="color: #339966;">{item["serial_key"]}</span></p>')
if item['download_urls']:
content.append('<hr /> <p><strong>喜加X地址</strong></p>')
for url in item['download_urls']:
content.append(f'<p><a href="{url}" target="_blank">{ai_content["title"]}</a></p>')
post.content = '\n'.join(content)
# 设置文章分类,处理父子分类关系
print(f"\n设置文章分类信息:")
print(f"- AI分析的分类类型{ai_content['type']}")
print(f"- 设置分类ID{ai_content['category_id']}")
# 分类ID到名称的映射
category_map = {
1: "游戏",
2: "软件",
3: "Apps",
57: "Android",
56: "APP游戏",
434: "IOS"
}
# 使用分类名称设置分类
categories = []
# 如果是Android、APP游戏或IOS分类同时添加Apps分类
if ai_content['category_id'] in [57, 56, 434]:
categories.append(category_map[3]) # 添加Apps分类名称
# 添加文章主分类名称
if ai_content['category_id'] and ai_content['category_id'] in category_map:
categories.append(category_map[ai_content['category_id']])
post.terms_names = {'category': categories} # 使用terms_names属性设置分类名称
print(f"- 最终分类设置:{categories}")
post.post_status = 'publish'
print("\n正在发布文章...")
try:
# 发布文章
post_id = self.client.call(NewPost(post))
print(f"文章发布成功文章ID{post_id}")
# 更新数据库中的文章状态
await item['db_manager'].mark_article_published(item['url'])
print("文章状态已更新!")
# 删除本地图片文件
if item.get('local_image') and os.path.exists(item['local_image']):
try:
os.remove(item['local_image'])
print(f"本地图片已删除:{item['local_image']}")
except Exception as e:
print(f"删除本地图片失败:{str(e)}")
except Exception as e:
error_msg = f"文章发布失败: {str(e)}"
print(f"错误:{error_msg}")
raise Exception(error_msg)