This commit is contained in:
DengDai
2025-12-08 14:47:24 +08:00
commit 644b5aaaf8
21 changed files with 1543 additions and 0 deletions

0
pt_gen/__init__.py Normal file
View File

0
pt_gen/api/__init__.py Normal file
View File

44
pt_gen/api/endpoints.py Normal file
View File

@@ -0,0 +1,44 @@
from fastapi import APIRouter, Depends, Header, HTTPException, status
from pydantic import BaseModel
from typing import Optional
from pt_gen.core.config import get_settings, Settings
from pt_gen.core.orchestrator import InfoOrchestrator
router = APIRouter()
from functools import lru_cache
@lru_cache()
def get_orchestrator():
# 注意这里 get_settings() 会被自动调用和缓存
return InfoOrchestrator(settings=get_settings())
# API Key 验证依赖,现在它也依赖于 get_settings
async def verify_api_key(
x_api_key: str = Header(...),
settings: Settings = Depends(get_settings)
):
if x_api_key != settings.api_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API Key",
)
class GenRequest(BaseModel):
url: str
@router.post(
"/gen",
response_model=str,
dependencies=[Depends(verify_api_key)]
)
async def generate_description(
request: GenRequest,
orchestrator: InfoOrchestrator = Depends(get_orchestrator)
):
"""
根据传入的豆瓣链接,生成格式化的介绍文本。
"""
result = await orchestrator.generate_info(request.url)
if not result or "无效" in result:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=result or "无法生成信息,请检查链接。",
)
return result

0
pt_gen/core/__init__.py Normal file
View File

50
pt_gen/core/config.py Normal file
View File

@@ -0,0 +1,50 @@
import yaml
from functools import lru_cache
from pydantic import BaseModel
from pydantic_settings import BaseSettings
from typing import Optional
CONFIG_PATH = "configs/config.yaml"
class TMDBConfig(BaseModel):
api_key: Optional[str] = None
class DoubanConfig(BaseModel):
cookie: Optional[str] = None
class RedisConfig(BaseModel):
host: str
port: int
db: int
cache_ttl_seconds: int
class UploaderConfig(BaseModel):
enable: bool
api_url: str
api_key: str
class Settings(BaseSettings):
api_key: str
tmdb: TMDBConfig
douban: DoubanConfig
redis: RedisConfig
uploader: UploaderConfig
def set_config_path(path: str = "configs/config.yaml"):
"""允许在启动时设置配置文件路径"""
global CONFIG_PATH
CONFIG_PATH = path
@lru_cache() # 使用 lru_cache 确保配置文件只被读取和解析一次
def get_settings() -> Settings:
"""
加载并返回配置对象。
这是一个可被 FastAPI 依赖注入的函数。
"""
try:
with open(CONFIG_PATH, 'r', encoding='utf-8') as f:
config_data = yaml.safe_load(f)
return Settings.parse_obj(config_data)
except FileNotFoundError:
raise RuntimeError(f"配置文件未找到: {CONFIG_PATH}")
except Exception as e:
raise RuntimeError(f"加载配置文件失败: {e}")

177
pt_gen/core/orchestrator.py Normal file
View File

@@ -0,0 +1,177 @@
import re
from typing import Optional
from jinja2 import Environment, FileSystemLoader
from pt_gen.core.config import Settings
from pt_gen.models.movie import MovieInfo
from pt_gen.services.cache import RedisCache
from pt_gen.services.douban import DoubanScraper
from pt_gen.services.tmdb import TMDBClient
from pt_gen.services.uploader import ImageUploader
class InfoOrchestrator:
def __init__(self, settings: Settings):
self.settings = settings
self.tmdb = TMDBClient(api_key=self.settings.tmdb.api_key)
self.douban = DoubanScraper(cookie=self.settings.douban.cookie)
self.cache = RedisCache(
host=self.settings.redis.host,
port=self.settings.redis.port,
db=self.settings.redis.db
)
if self.settings.uploader.enable:
self.uploader = ImageUploader(
api_url=self.settings.uploader.api_url,
api_key=self.settings.uploader.api_key
)
else:
self.uploader = None
self.jinja_env = Environment(loader=FileSystemLoader('templates'))
def _extract_douban_id(self, url: str) -> Optional[str]:
match = re.search(r'douban\.com/subject/(\d+)', url)
return match.group(1) if match else None
def _extract_imdb_id(self, url: str) -> Optional[str]:
match = re.search(r'imdb\.com/title/(tt\d+)', url)
return match.group(1) if match else None
def _extract_tmdb_id(self, url: str) -> Optional[str]:
match = re.search(r'themoviedb\.org/(?:movie|tv)/(\d+)', url)
return match.group(1) if match else None
async def generate_info(self, url: str) -> Optional[str]:
# --- 步骤 1: 识别URL类型并提取初始ID ---
douban_id = self._extract_douban_id(url)
imdb_id = self._extract_imdb_id(url)
tmdb_id = self._extract_tmdb_id(url)
if not (douban_id or imdb_id or tmdb_id):
return "无效或不支持的链接。请输入有效的豆瓣、IMDb或TMDB链接。"
# --- 步骤 2: ID 补全 (以 IMDb ID 为核心) ---
# 如果从 TMDB 链接开始,先找到 IMDb ID
if tmdb_id and not imdb_id:
details = await self.tmdb.get_movie_details(tmdb_id)
if details and details.get('imdb_id'):
imdb_id = details['imdb_id']
# 如果从豆瓣链接开始,爬取豆瓣页面来找到 IMDb ID
if douban_id and not imdb_id:
# 临时爬一次,只为获取 IMDb ID
temp_douban_data = await self.douban.scrape_movie_info(douban_id)
if temp_douban_data.get('imdb_id'):
imdb_id = temp_douban_data['imdb_id']
# --- 步骤 3: 查缓存 (使用唯一的 IMDb ID 作为 key) ---
if imdb_id:
cache_key = f"movieinfo:{imdb_id}"
cached_result = await self.cache.get(cache_key)
if cached_result:
print(f"命中缓存: {cache_key}")
return cached_result
else:
# 如果没有IMDb ID (例如一个没有IMDB链接的冷门豆瓣电影), 则用豆瓣ID做缓存key
cache_key = f"movieinfo:douban:{douban_id}"
# --- 步骤 4: 分别获取数据 ---
douban_data = {}
tmdb_data = {}
# 如果有豆瓣ID就从豆瓣获取数据
if douban_id:
douban_data = await self.douban.scrape_movie_info(douban_id)
if not douban_data:
return "从豆瓣获取信息失败请检查链接或Cookie。"
# 如果有IMDb ID就从TMDB获取数据
if imdb_id:
tmdb_find_data = await self.tmdb.find_by_imdb_id(imdb_id)
if tmdb_find_data and tmdb_find_data.get('movie_results'):
# 确保 tmdb_id 被正确设置
tmdb_id = tmdb_find_data['movie_results'][0]['id']
tmdb_data = await self.tmdb.get_movie_details(tmdb_id)
if imdb_id and not tmdb_id:
tmdb_find_data = await self.tmdb.find_by_imdb_id(imdb_id)
if tmdb_find_data and tmdb_find_data.get('movie_results'):
tmdb_id = tmdb_find_data['movie_results'][0]['id']
if tmdb_id:
# 现在 get_movie_details 返回的是干净的数据
tmdb_data = await self.tmdb.get_movie_details(tmdb_id)
# --- 步骤 5: 智能合并数据到 MovieInfo 对象 ---
movie = MovieInfo()
# 优先使用TMDB数据因为通常更规范
if tmdb_data:
movie.tmdb_id = tmdb_data.get('id')
movie.imdb_id = tmdb_data.get('imdb_id')
movie.original_title = tmdb_data.get('original_title')
movie.chinese_title = tmdb_data.get('title')
movie.aka_titles = tmdb_data.get('aka_titles', [])
movie.year = tmdb_data.get('release_date', '')[:4]
movie.release_date = tmdb_data.get('release_date')
movie.runtime = tmdb_data.get('runtime')
movie.spoken_languages = tmdb_data.get('spoken_languages', [])
movie.tagline = tmdb_data.get('tagline')
movie.imdb_rating = f"{tmdb_data.get('vote_average', 0):.1f}/10"
movie.directors = tmdb_data.get('directors', [])
movie.writers = tmdb_data.get('writers', [])
movie.actors = tmdb_data.get('actors', [])
movie.genres = tmdb_data.get('genres', [])
movie.countries = tmdb_data.get('countries', [])
# TMDB的简介可能是空的先用着后面会被豆瓣覆盖
movie.synopsis = tmdb_data.get('overview', '')
if tmdb_data.get('poster_path'):
movie.poster_url = f"https://image.tmdb.org/t/p/original{tmdb_data['poster_path']}"
# 使用豆瓣数据进行补充或覆盖
if douban_data:
movie.douban_id = douban_id
movie.douban_link = f"https://movie.douban.com/subject/{douban_id}/"
# 智能填充如果TMDB没值才用豆瓣的
movie.chinese_title = movie.chinese_title or douban_data.get('chinese_title')
movie.year = movie.year or douban_data.get('year')
movie.runtime = movie.runtime or douban_data.get('runtime')
movie.spoken_languages = movie.spoken_languages or douban_data.get('spoken_languages', [])
movie.directors = movie.directors or douban_data.get('directors', [])
movie.writers = movie.writers or douban_data.get('writers', [])
movie.actors = movie.actors or douban_data.get('actors', [])
movie.genres = movie.genres or douban_data.get('genres', [])
movie.countries = movie.countries or douban_data.get('countries', [])
movie.poster_url = movie.poster_url or douban_data.get('poster_url')
# 强制覆盖:豆瓣的简介、评分和获奖通常更符合中文区需求
if douban_data.get('synopsis'):
movie.synopsis = douban_data.get('synopsis')
movie.douban_rating = douban_data.get('douban_rating', 'N/A')
movie.awards = douban_data.get('awards', [])
# 确保IMDb链接存在
if movie.imdb_id:
movie.imdb_link = f"https://www.imdb.com/title/{movie.imdb_id}/"
# 确保标题有备用值
if not movie.original_title:
movie.original_title = movie.chinese_title
if not movie.chinese_title:
movie.chinese_title = movie.original_title
# --- 步骤 6: 后续处理 (图片转存、渲染、缓存) ---
if self.uploader and movie.poster_url:
print(f"开始转存图片: {movie.poster_url}")
movie.poster_url = await self.uploader.upload(movie.poster_url)
print(f"转存成功新URL: {movie.poster_url}")
template = self.jinja_env.get_template('description.jinja2')
final_text = template.render(movie.dict())
# 写入缓存
await self.cache.set(cache_key, final_text, ttl=self.settings.redis.cache_ttl_seconds)
print("\n" + "="*40 + " FINAL RENDERED OUTPUT " + "="*40)
print(final_text)
print("="*103 + "\n")
return final_text

View File

32
pt_gen/models/movie.py Normal file
View File

@@ -0,0 +1,32 @@
from typing import List, Optional
from pydantic import BaseModel
class MovieInfo(BaseModel):
douban_id: Optional[str] = None
imdb_id: Optional[str] = None
tmdb_id: Optional[int] = None
chinese_title: Optional[str] = None
original_title: Optional[str] = None
aka_titles: List[str] = []
year: Optional[str] = None
poster_url: Optional[str] = None
countries: List[str] = []
genres: List[str] = []
release_date: Optional[str] = None
runtime: Optional[str] = None
douban_rating: Optional[str] = None
douban_link: Optional[str] = None
imdb_rating: Optional[str] = None
imdb_link: Optional[str] = None
directors: List[str] = []
writers: List[str] = []
actors: List[str] = []
spoken_languages: List[str] = []
tagline: Optional[str] = None
synopsis: str = ""
awards: List[str] = []

View File

15
pt_gen/services/cache.py Normal file
View File

@@ -0,0 +1,15 @@
import redis.asyncio as redis
from typing import Optional
class RedisCache:
def __init__(self, host: str, port: int, db: int):
self.client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
async def get(self, key: str) -> Optional[str]:
return await self.client.get(key)
async def set(self, key: str, value: str, ttl: int):
await self.client.setex(key, ttl, value)
async def close(self):
await self.client.close()

126
pt_gen/services/douban.py Normal file
View File

@@ -0,0 +1,126 @@
# pt_gen/services/douban.py
import re
import httpx
from bs4 import BeautifulSoup, NavigableString
from typing import Optional, Dict, List
class DoubanScraper:
def __init__(self, cookie: Optional[str] = None):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'
}
if cookie:
self.headers['Cookie'] = cookie
def _get_info_text(self, soup_tag: BeautifulSoup, label: str) -> Optional[str]:
"""
一个辅助函数,用于在 #info 块中通过标签名(如“导演”)查找信息。
它查找包含指定标签文本的 <span class="pl"> 元素,然后获取其后的文本内容。
"""
tag = soup_tag.find('span', class_='pl', string=re.compile(label))
if tag:
# next_sibling 可能是 NavigableString, 也可能是 Tag
# 我们需要循环直到找到有意义的文本
next_node = tag.next_sibling
while next_node:
if isinstance(next_node, NavigableString) and next_node.strip():
return next_node.strip().strip(':').strip()
# 如果是 Tag我们尝试获取它的文本
if hasattr(next_node, 'get_text') and next_node.get_text(strip=True):
return next_node.get_text(strip=True)
next_node = next_node.next_sibling
return None
def _split_info(self, text: Optional[str]) -> List[str]:
"""将通过' / '分隔的字符串拆分为列表"""
if not text:
return []
return [item.strip() for item in text.split(' / ')]
async def scrape_movie_info(self, douban_id: str) -> Dict:
url = f"https://movie.douban.com/subject/{douban_id}/"
try:
async with httpx.AsyncClient(headers=self.headers, follow_redirects=True) as client:
response = await client.get(url, timeout=20)
response.raise_for_status()
except httpx.HTTPStatusError as e:
print(f"请求豆瓣页面失败: {e.response.status_code}")
if e.response.status_code == 403:
print("访问被拒绝 (403 Forbidden)。请检查你的 Cookie 是否有效或 IP 是否被限制。")
return {}
except httpx.RequestError as e:
print(f"请求豆瓣时发生网络错误: {e}")
return {}
soup = BeautifulSoup(response.text, 'html.parser')
info = {}
# 1. 标题和年份
title_tag = soup.find('h1')
if title_tag:
info['chinese_title'] = title_tag.find('span', property='v:itemreviewed').text.strip()
year_span = title_tag.find('span', class_='year')
if year_span:
info['year'] = re.search(r'\((\d{4})\)', year_span.text).group(1)
# 2. 海报
poster_img = soup.find('img', rel='v:image')
if poster_img and poster_img.get('src'):
info['poster_url'] = poster_img['src'].replace('/s_ratio_poster/', '/l_ratio_poster/')
# 3. #info 块的结构化解析
info_div = soup.find('div', id='info')
if info_div:
# 使用辅助函数和分割函数来获取信息
info['directors'] = self._split_info(self._get_info_text(info_div, '导演'))
info['writers'] = self._split_info(self._get_info_text(info_div, '编剧'))
# 主演信息可能跟在a标签后单独处理
actors_tag = info_div.find('span', class_='actor')
if actors_tag:
actors_list = [a.text.strip() for a in actors_tag.find_all('a')]
info['actors'] = actors_list[:15] # 最多取15个主演
else: # 备用方案
info['actors'] = self._split_info(self._get_info_text(info_div, '主演'))[:15]
info['countries'] = self._split_info(self._get_info_text(info_div, '制片国家/地区'))
info['spoken_languages'] = self._split_info(self._get_info_text(info_div, '语言'))
release_date_text = self._get_info_text(info_div, '上映日期')
if release_date_text:
# 只取第一个上映日期
info['release_date'] = self._split_info(release_date_text)[0]
info['runtime'] = self._get_info_text(info_div, '片长')
info['aka_titles'] = self._split_info(self._get_info_text(info_div, '又名'))
imdb_link_text = self._get_info_text(info_div, 'IMDb')
if imdb_link_text:
imdb_match = re.search(r'(tt\d+)', imdb_link_text)
if imdb_match:
info['imdb_id'] = imdb_match.group(1)
# 4. 类型 (Genres) - property='v:genre' 的方式更可靠
info['genres'] = [g.get_text(strip=True) for g in soup.find_all('span', property='v:genre')]
# 5. 简介
synopsis_span = soup.find('span', property='v:summary')
if synopsis_span:
info['synopsis'] = synopsis_span.get_text(strip=True).replace('\u3000', '')
else: # 备用方案
hidden_synopsis = soup.find('span', class_='all hidden')
if hidden_synopsis:
info['synopsis'] = hidden_synopsis.get_text(strip=True).replace('\u3000', '')
else:
info['synopsis'] = ""
# 6. 评分
rating_strong = soup.find('strong', property='v:average')
info['douban_rating'] = f"{rating_strong.text}/10" if rating_strong and rating_strong.text else "N/A"
# 7. 获奖
awards_ul = soup.find('ul', class_='award')
info['awards'] = [li.get_text(strip=True, separator=' ') for li in awards_ul.find_all('li')] if awards_ul else []
return info

109
pt_gen/services/tmdb.py Normal file
View File

@@ -0,0 +1,109 @@
import httpx
from typing import Optional, List, Dict, Any
class TMDBClient:
BASE_URL = "https://api.themoviedb.org/3"
def __init__(self, api_key: str):
self.api_key = api_key
async def _make_request(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]:
if params is None:
params = {}
# 总是包含 api_key 和 language
params['api_key'] = self.api_key
params['language'] = 'zh-CN'
url = f"{self.BASE_URL}{endpoint}"
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
print(f"TMDB API 请求失败: {e.response.status_code} for URL: {e.request.url}")
return None
except httpx.RequestError as e:
print(f"TMDB 请求错误: {e}")
return None
async def find_by_imdb_id(self, imdb_id: str) -> Optional[Dict[str, Any]]:
endpoint = f"/find/{imdb_id}"
return await self._make_request(endpoint, params={'external_source': 'imdb_id'})
# --- 重构 get_movie_details 方法 ---
async def get_movie_details(self, tmdb_id: str) -> Optional[Dict[str, Any]]:
params = {
'append_to_response': 'credits,alternative_titles'
}
endpoint = f"/movie/{tmdb_id}"
data = await self._make_request(endpoint, params=params)
if not data:
return None
return self._parse_movie_details(data)
def _parse_movie_details(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""将从 TMDB API 获取的原始数据解析为干净的字典"""
directors = []
if 'credits' in data and 'crew' in data['credits']:
for member in data['credits']['crew']:
if member.get('job') == 'Director':
directors.append(member['name'])
writers = []
writing_jobs = {'Writer', 'Screenplay', 'Story'}
if 'credits' in data and 'crew' in data['credits']:
for member in data['credits']['crew']:
# 检查部门是否为'Writing'并且职位是我们想要的
if member.get('department') == 'Writing':# and member.get('job') in writing_jobs:
# 避免重复添加同一个人
if member['name'] not in writers:
writers.append(member['name'])
actors = []
if 'credits' in data and 'cast' in data['credits']:
for member in data['credits']['cast']:
actors.append(member['name'])
genres = [genre['name'] for genre in data.get('genres', [])]
countries = [country['name'] for country in data.get('production_countries', [])]
aka_titles = []
if 'alternative_titles' in data and 'titles' in data['alternative_titles']:
for item in data['alternative_titles']['titles']:
if item.get('iso_3166_1') == 'CN':
aka_titles.append(item['title'])
runtime_min = data.get('runtime')
runtime = f"{runtime_min}分钟" if runtime_min else None
spoken_languages = [lang['english_name'] for lang in data.get('spoken_languages', [])]
parsed_data = {
"id": data.get('id'),
"imdb_id": data.get('imdb_id'),
"title": data.get('title'),
"original_title": data.get('original_title'),
"overview": data.get('overview'),
"release_date": data.get('release_date', ''),
"runtime": runtime,
"spoken_languages": spoken_languages,
"tagline": data.get('tagline'),
"tagline": data.get('tagline'),
"poster_path": data.get('poster_path'),
"vote_average": data.get('vote_average', 0),
"directors": directors,
"writers": writers,
"actors": actors,
"genres": genres,
"countries": countries,
"aka_titles": aka_titles
}
return parsed_data

View File

@@ -0,0 +1,34 @@
import httpx
from . import uploader
class ImageUploader:
def __init__(self, api_url: str, api_key: str):
self.api_url = api_url
self.api_key = api_key
async def upload(self, image_url: str) -> str:
"""
从一个URL下载图片然后上传到你自己的图床。
你需要根据你的图床 API 修改这部分代码。
"""
try:
async with httpx.AsyncClient() as client:
# 1. 下载图片
get_resp = await client.get(image_url)
get_resp.raise_for_status()
image_bytes = get_resp.content
# 2. 上传到你的图床
# 这是一个通用示例,你需要修改 files 和 headers
files = {'file': ('poster.jpg', image_bytes, 'image/jpeg')}
headers = {'Authorization': f'Bearer {self.api_key}'}
post_resp = await client.post(self.api_url, files=files, headers=headers)
post_resp.raise_for_status()
# 3. 解析响应返回新的图片URL
# 假设返回的JSON是 {"data": {"url": "..."}}
return post_resp.json()['data']['url']
except Exception as e:
print(f"图片上传失败: {e}")
return image_url # 上传失败返回原始URL