Files
PTGen/pt_gen/services/douban.py
DengDai 644b5aaaf8 init
2025-12-08 14:47:24 +08:00

127 lines
5.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# pt_gen/services/douban.py
import re
import httpx
from bs4 import BeautifulSoup, NavigableString
from typing import Optional, Dict, List
class DoubanScraper:
def __init__(self, cookie: Optional[str] = None):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'
}
if cookie:
self.headers['Cookie'] = cookie
def _get_info_text(self, soup_tag: BeautifulSoup, label: str) -> Optional[str]:
"""
一个辅助函数,用于在 #info 块中通过标签名(如“导演”)查找信息。
它查找包含指定标签文本的 <span class="pl"> 元素,然后获取其后的文本内容。
"""
tag = soup_tag.find('span', class_='pl', string=re.compile(label))
if tag:
# next_sibling 可能是 NavigableString, 也可能是 Tag
# 我们需要循环直到找到有意义的文本
next_node = tag.next_sibling
while next_node:
if isinstance(next_node, NavigableString) and next_node.strip():
return next_node.strip().strip(':').strip()
# 如果是 Tag我们尝试获取它的文本
if hasattr(next_node, 'get_text') and next_node.get_text(strip=True):
return next_node.get_text(strip=True)
next_node = next_node.next_sibling
return None
def _split_info(self, text: Optional[str]) -> List[str]:
"""将通过' / '分隔的字符串拆分为列表"""
if not text:
return []
return [item.strip() for item in text.split(' / ')]
async def scrape_movie_info(self, douban_id: str) -> Dict:
url = f"https://movie.douban.com/subject/{douban_id}/"
try:
async with httpx.AsyncClient(headers=self.headers, follow_redirects=True) as client:
response = await client.get(url, timeout=20)
response.raise_for_status()
except httpx.HTTPStatusError as e:
print(f"请求豆瓣页面失败: {e.response.status_code}")
if e.response.status_code == 403:
print("访问被拒绝 (403 Forbidden)。请检查你的 Cookie 是否有效或 IP 是否被限制。")
return {}
except httpx.RequestError as e:
print(f"请求豆瓣时发生网络错误: {e}")
return {}
soup = BeautifulSoup(response.text, 'html.parser')
info = {}
# 1. 标题和年份
title_tag = soup.find('h1')
if title_tag:
info['chinese_title'] = title_tag.find('span', property='v:itemreviewed').text.strip()
year_span = title_tag.find('span', class_='year')
if year_span:
info['year'] = re.search(r'\((\d{4})\)', year_span.text).group(1)
# 2. 海报
poster_img = soup.find('img', rel='v:image')
if poster_img and poster_img.get('src'):
info['poster_url'] = poster_img['src'].replace('/s_ratio_poster/', '/l_ratio_poster/')
# 3. #info 块的结构化解析
info_div = soup.find('div', id='info')
if info_div:
# 使用辅助函数和分割函数来获取信息
info['directors'] = self._split_info(self._get_info_text(info_div, '导演'))
info['writers'] = self._split_info(self._get_info_text(info_div, '编剧'))
# 主演信息可能跟在a标签后单独处理
actors_tag = info_div.find('span', class_='actor')
if actors_tag:
actors_list = [a.text.strip() for a in actors_tag.find_all('a')]
info['actors'] = actors_list[:15] # 最多取15个主演
else: # 备用方案
info['actors'] = self._split_info(self._get_info_text(info_div, '主演'))[:15]
info['countries'] = self._split_info(self._get_info_text(info_div, '制片国家/地区'))
info['spoken_languages'] = self._split_info(self._get_info_text(info_div, '语言'))
release_date_text = self._get_info_text(info_div, '上映日期')
if release_date_text:
# 只取第一个上映日期
info['release_date'] = self._split_info(release_date_text)[0]
info['runtime'] = self._get_info_text(info_div, '片长')
info['aka_titles'] = self._split_info(self._get_info_text(info_div, '又名'))
imdb_link_text = self._get_info_text(info_div, 'IMDb')
if imdb_link_text:
imdb_match = re.search(r'(tt\d+)', imdb_link_text)
if imdb_match:
info['imdb_id'] = imdb_match.group(1)
# 4. 类型 (Genres) - property='v:genre' 的方式更可靠
info['genres'] = [g.get_text(strip=True) for g in soup.find_all('span', property='v:genre')]
# 5. 简介
synopsis_span = soup.find('span', property='v:summary')
if synopsis_span:
info['synopsis'] = synopsis_span.get_text(strip=True).replace('\u3000', '')
else: # 备用方案
hidden_synopsis = soup.find('span', class_='all hidden')
if hidden_synopsis:
info['synopsis'] = hidden_synopsis.get_text(strip=True).replace('\u3000', '')
else:
info['synopsis'] = ""
# 6. 评分
rating_strong = soup.find('strong', property='v:average')
info['douban_rating'] = f"{rating_strong.text}/10" if rating_strong and rating_strong.text else "N/A"
# 7. 获奖
awards_ul = soup.find('ul', class_='award')
info['awards'] = [li.get_text(strip=True, separator=' ') for li in awards_ul.find_all('li')] if awards_ul else []
return info