import httpx from typing import Optional, List, Dict, Any class TMDBClient: BASE_URL = "https://api.themoviedb.org/3" def __init__(self, api_key: str): self.api_key = api_key async def _make_request(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> Optional[Dict[str, Any]]: if params is None: params = {} # 总是包含 api_key 和 language params['api_key'] = self.api_key params['language'] = 'zh-CN' url = f"{self.BASE_URL}{endpoint}" async with httpx.AsyncClient() as client: try: response = await client.get(url, params=params) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: print(f"TMDB API 请求失败: {e.response.status_code} for URL: {e.request.url}") return None except httpx.RequestError as e: print(f"TMDB 请求错误: {e}") return None async def find_by_imdb_id(self, imdb_id: str) -> Optional[Dict[str, Any]]: endpoint = f"/find/{imdb_id}" return await self._make_request(endpoint, params={'external_source': 'imdb_id'}) # --- 重构 get_movie_details 方法 --- async def get_movie_details(self, tmdb_id: str) -> Optional[Dict[str, Any]]: params = { 'append_to_response': 'credits,alternative_titles' } endpoint = f"/movie/{tmdb_id}" data = await self._make_request(endpoint, params=params) if not data: return None return self._parse_movie_details(data) def _parse_movie_details(self, data: Dict[str, Any]) -> Dict[str, Any]: """将从 TMDB API 获取的原始数据解析为干净的字典""" directors = [] if 'credits' in data and 'crew' in data['credits']: for member in data['credits']['crew']: if member.get('job') == 'Director': directors.append(member['name']) writers = [] writing_jobs = {'Writer', 'Screenplay', 'Story'} if 'credits' in data and 'crew' in data['credits']: for member in data['credits']['crew']: # 检查部门是否为'Writing'并且职位是我们想要的 if member.get('department') == 'Writing':# and member.get('job') in writing_jobs: # 避免重复添加同一个人 if member['name'] not in writers: writers.append(member['name']) actors = [] if 'credits' in data and 'cast' in data['credits']: for member in data['credits']['cast']: actors.append(member['name']) genres = [genre['name'] for genre in data.get('genres', [])] countries = [country['name'] for country in data.get('production_countries', [])] aka_titles = [] if 'alternative_titles' in data and 'titles' in data['alternative_titles']: for item in data['alternative_titles']['titles']: if item.get('iso_3166_1') == 'CN': aka_titles.append(item['title']) runtime_min = data.get('runtime') runtime = f"{runtime_min}分钟" if runtime_min else None spoken_languages = [lang['english_name'] for lang in data.get('spoken_languages', [])] parsed_data = { "id": data.get('id'), "imdb_id": data.get('imdb_id'), "title": data.get('title'), "original_title": data.get('original_title'), "overview": data.get('overview'), "release_date": data.get('release_date', ''), "runtime": runtime, "spoken_languages": spoken_languages, "tagline": data.get('tagline'), "tagline": data.get('tagline'), "poster_path": data.get('poster_path'), "vote_average": data.get('vote_average', 0), "directors": directors, "writers": writers, "actors": actors, "genres": genres, "countries": countries, "aka_titles": aka_titles } return parsed_data