import logging from plexapi.server import PlexServer from plexapi.exceptions import Unauthorized, NotFound log = logging.getLogger(__name__) class PlexManager: """ 用于与 Plex Media Server 交互的工具类。 - 适配项目 config.json 格式。 - 在发生错误时抛出异常,而不是返回错误字典。 - 成功时直接返回数据。 """ def __init__(self, config): """ 根据配置初始化 PlexManager。 :param config: 来自 config.json 的 'plex' 部分。 """ self.config = config self.token = config.get('token') host = config.get('host') port = config.get('port') use_ssl = config.get('use_ssl', False) if not all([host, port, self.token]): self.baseurl = None else: scheme = 'https' if use_ssl else 'http' self.baseurl = f"{scheme}://{host}:{port}" self.server = None log.debug(f"PlexManager initialized for URL: {self.baseurl}") def _connect(self): """ 连接到 Plex 服务器。如果连接已建立则直接返回。 如果失败,则会抛出异常。 """ if self.server: return if not self.baseurl: raise ValueError("Plex 未配置 (host, port, token)") try: log.debug(f"尝试连接到 Plex: {self.baseurl}") # plexapi 内部有重试机制,但我们可以设置一个合理的超时 self.server = PlexServer(self.baseurl, self.token, timeout=10) # 访问一个属性来验证连接 _ = self.server.friendlyName log.info(f"成功连接到 Plex 服务器: {self.server.friendlyName}") except Unauthorized: log.error("Plex 连接失败: Token 无效或权限不足") raise # 将原始异常重新抛出 except Exception as e: log.error(f"Plex 连接失败: {e}", exc_info=True) self.server = None # 连接失败时重置 server 对象 raise # 重新抛出,让调用者处理 def test_connection(self): """ 测试与 Plex 的连接,并返回一个易于前端展示的状态元组。 :return: (bool, str) -> (是否成功, 消息) """ try: self._connect() message = f"连接成功: {self.server.friendlyName} (v{self.server.version})" return True, message except Exception as e: return False, f"连接失败: {e}" def get_stats(self): """获取所有媒体库的项目总数,用于仪表盘。""" try: self._connect() return len(self.server.library.sections()) except Exception: return 0 # 如果连接失败,返回0 def get_libraries(self): """ 获取媒体库列表。 :return: list of dicts, 每个字典代表一个媒体库 """ self._connect() libraries = [] log.debug("正在获取 Plex 媒体库列表") for section in self.server.library.sections(): libraries.append({ 'id': section.key, 'name': section.title, 'type': section.type, 'path': ', '.join(section.locations) }) log.debug(f"成功获取 {len(libraries)} 个 Plex 媒体库") return libraries def get_library_items(self, library_id): """ 获取指定媒体库中的所有项目。 此方法会抛出 NotFound 或 ValueError 异常,需要调用者捕获。 :param library_id: 媒体库的 ID (key) :return: list of dicts, 每个字典代表一个媒体项目 """ self._connect() log.debug(f"正在获取 Plex 媒体库 '{library_id}' 的项目") # plexapi v4.12.0+ sectionByID 接受字符串或整数 section = self.server.library.sectionByID(library_id) items = [] if section.type not in ['movie', 'show']: log.warning(f"跳过不支持的 Plex 库类型: {section.type}") return items for item in section.all(): item_data = { 'id': item.ratingKey, 'title': item.title, 'type': item.type, 'year': item.year, 'summary': item.summary, 'poster': self.server.url(item.thumb, True) if item.thumb else None, 'path': ', '.join(item.locations), } items.append(item_data) log.debug(f"从库 '{section.title}' 获取到 {len(items)} 个项目") return items def refresh_library(self, library_id): """ 发送刷新指定媒体库的请求。 :param library_id: 媒体库的 ID (key) """ self._connect() log.debug(f"正在请求刷新 Plex 媒体库: {library_id}") section = self.server.library.sectionByID(library_id) section.update() log.info(f"已向 Plex 媒体库 '{section.title}' 发送刷新请求")