# pt_gen/services/douban.py import re import httpx from bs4 import BeautifulSoup, NavigableString from typing import Optional, Dict, List class DoubanScraper: def __init__(self, cookie: Optional[str] = None): self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8' } if cookie: self.headers['Cookie'] = cookie def _get_info_text(self, soup_tag: BeautifulSoup, label: str) -> Optional[str]: """ 一个辅助函数,用于在 #info 块中通过标签名(如“导演”)查找信息。 它查找包含指定标签文本的 元素,然后获取其后的文本内容。 """ tag = soup_tag.find('span', class_='pl', string=re.compile(label)) if tag: # next_sibling 可能是 NavigableString, 也可能是 Tag # 我们需要循环直到找到有意义的文本 next_node = tag.next_sibling while next_node: if isinstance(next_node, NavigableString) and next_node.strip(): return next_node.strip().strip(':').strip() # 如果是 Tag,我们尝试获取它的文本 if hasattr(next_node, 'get_text') and next_node.get_text(strip=True): return next_node.get_text(strip=True) next_node = next_node.next_sibling return None def _split_info(self, text: Optional[str]) -> List[str]: """将通过' / '分隔的字符串拆分为列表""" if not text: return [] return [item.strip() for item in text.split(' / ')] async def scrape_movie_info(self, douban_id: str) -> Dict: url = f"https://movie.douban.com/subject/{douban_id}/" try: async with httpx.AsyncClient(headers=self.headers, follow_redirects=True) as client: response = await client.get(url, timeout=20) response.raise_for_status() except httpx.HTTPStatusError as e: print(f"请求豆瓣页面失败: {e.response.status_code}") if e.response.status_code == 403: print("访问被拒绝 (403 Forbidden)。请检查你的 Cookie 是否有效或 IP 是否被限制。") return {} except httpx.RequestError as e: print(f"请求豆瓣时发生网络错误: {e}") return {} soup = BeautifulSoup(response.text, 'html.parser') info = {} # 1. 标题和年份 title_tag = soup.find('h1') if title_tag: info['chinese_title'] = title_tag.find('span', property='v:itemreviewed').text.strip() year_span = title_tag.find('span', class_='year') if year_span: info['year'] = re.search(r'\((\d{4})\)', year_span.text).group(1) # 2. 海报 poster_img = soup.find('img', rel='v:image') if poster_img and poster_img.get('src'): info['poster_url'] = poster_img['src'].replace('/s_ratio_poster/', '/l_ratio_poster/') # 3. #info 块的结构化解析 info_div = soup.find('div', id='info') if info_div: # 使用辅助函数和分割函数来获取信息 info['directors'] = self._split_info(self._get_info_text(info_div, '导演')) info['writers'] = self._split_info(self._get_info_text(info_div, '编剧')) # 主演信息可能跟在a标签后,单独处理 actors_tag = info_div.find('span', class_='actor') if actors_tag: actors_list = [a.text.strip() for a in actors_tag.find_all('a')] info['actors'] = actors_list[:15] # 最多取15个主演 else: # 备用方案 info['actors'] = self._split_info(self._get_info_text(info_div, '主演'))[:15] info['countries'] = self._split_info(self._get_info_text(info_div, '制片国家/地区')) info['spoken_languages'] = self._split_info(self._get_info_text(info_div, '语言')) release_date_text = self._get_info_text(info_div, '上映日期') if release_date_text: # 只取第一个上映日期 info['release_date'] = self._split_info(release_date_text)[0] info['runtime'] = self._get_info_text(info_div, '片长') info['aka_titles'] = self._split_info(self._get_info_text(info_div, '又名')) imdb_link_text = self._get_info_text(info_div, 'IMDb') if imdb_link_text: imdb_match = re.search(r'(tt\d+)', imdb_link_text) if imdb_match: info['imdb_id'] = imdb_match.group(1) # 4. 类型 (Genres) - property='v:genre' 的方式更可靠 info['genres'] = [g.get_text(strip=True) for g in soup.find_all('span', property='v:genre')] # 5. 简介 synopsis_span = soup.find('span', property='v:summary') if synopsis_span: info['synopsis'] = synopsis_span.get_text(strip=True).replace('\u3000', '') else: # 备用方案 hidden_synopsis = soup.find('span', class_='all hidden') if hidden_synopsis: info['synopsis'] = hidden_synopsis.get_text(strip=True).replace('\u3000', '') else: info['synopsis'] = "" # 6. 评分 rating_strong = soup.find('strong', property='v:average') info['douban_rating'] = f"{rating_strong.text}/10" if rating_strong and rating_strong.text else "N/A" # 7. 获奖 awards_ul = soup.find('ul', class_='award') info['awards'] = [li.get_text(strip=True, separator=' ') for li in awards_ul.find_all('li')] if awards_ul else [] return info