Files
Skit-Panel/backend/utils/plex_utils.py
DengDai 519589f8f5 init
2025-12-08 14:45:14 +08:00

142 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
from plexapi.server import PlexServer
from plexapi.exceptions import Unauthorized, NotFound
log = logging.getLogger(__name__)
class PlexManager:
"""
用于与 Plex Media Server 交互的工具类。
- 适配项目 config.json 格式。
- 在发生错误时抛出异常,而不是返回错误字典。
- 成功时直接返回数据。
"""
def __init__(self, config):
"""
根据配置初始化 PlexManager。
:param config: 来自 config.json 的 'plex' 部分。
"""
self.config = config
self.token = config.get('token')
host = config.get('host')
port = config.get('port')
use_ssl = config.get('use_ssl', False)
if not all([host, port, self.token]):
self.baseurl = None
else:
scheme = 'https' if use_ssl else 'http'
self.baseurl = f"{scheme}://{host}:{port}"
self.server = None
log.debug(f"PlexManager initialized for URL: {self.baseurl}")
def _connect(self):
"""
连接到 Plex 服务器。如果连接已建立则直接返回。
如果失败,则会抛出异常。
"""
if self.server:
return
if not self.baseurl:
raise ValueError("Plex 未配置 (host, port, token)")
try:
log.debug(f"尝试连接到 Plex: {self.baseurl}")
# plexapi 内部有重试机制,但我们可以设置一个合理的超时
self.server = PlexServer(self.baseurl, self.token, timeout=10)
# 访问一个属性来验证连接
_ = self.server.friendlyName
log.info(f"成功连接到 Plex 服务器: {self.server.friendlyName}")
except Unauthorized:
log.error("Plex 连接失败: Token 无效或权限不足")
raise # 将原始异常重新抛出
except Exception as e:
log.error(f"Plex 连接失败: {e}", exc_info=True)
self.server = None # 连接失败时重置 server 对象
raise # 重新抛出,让调用者处理
def test_connection(self):
"""
测试与 Plex 的连接,并返回一个易于前端展示的状态元组。
:return: (bool, str) -> (是否成功, 消息)
"""
try:
self._connect()
message = f"连接成功: {self.server.friendlyName} (v{self.server.version})"
return True, message
except Exception as e:
return False, f"连接失败: {e}"
def get_stats(self):
"""获取所有媒体库的项目总数,用于仪表盘。"""
try:
self._connect()
return len(self.server.library.sections())
except Exception:
return 0 # 如果连接失败返回0
def get_libraries(self):
"""
获取媒体库列表。
:return: list of dicts, 每个字典代表一个媒体库
"""
self._connect()
libraries = []
log.debug("正在获取 Plex 媒体库列表")
for section in self.server.library.sections():
libraries.append({
'id': section.key,
'name': section.title,
'type': section.type,
'path': ', '.join(section.locations)
})
log.debug(f"成功获取 {len(libraries)} 个 Plex 媒体库")
return libraries
def get_library_items(self, library_id):
"""
获取指定媒体库中的所有项目。
此方法会抛出 NotFound 或 ValueError 异常,需要调用者捕获。
:param library_id: 媒体库的 ID (key)
:return: list of dicts, 每个字典代表一个媒体项目
"""
self._connect()
log.debug(f"正在获取 Plex 媒体库 '{library_id}' 的项目")
# plexapi v4.12.0+ sectionByID 接受字符串或整数
section = self.server.library.sectionByID(library_id)
items = []
if section.type not in ['movie', 'show']:
log.warning(f"跳过不支持的 Plex 库类型: {section.type}")
return items
for item in section.all():
item_data = {
'id': item.ratingKey,
'title': item.title,
'type': item.type,
'year': item.year,
'summary': item.summary,
'poster': self.server.url(item.thumb, True) if item.thumb else None,
'path': ', '.join(item.locations),
}
items.append(item_data)
log.debug(f"从库 '{section.title}' 获取到 {len(items)} 个项目")
return items
def refresh_library(self, library_id):
"""
发送刷新指定媒体库的请求。
:param library_id: 媒体库的 ID (key)
"""
self._connect()
log.debug(f"正在请求刷新 Plex 媒体库: {library_id}")
section = self.server.library.sectionByID(library_id)
section.update()
log.info(f"已向 Plex 媒体库 '{section.title}' 发送刷新请求")