Files
PTGroup/api/statistics.py
DengDai 02ecea06f8 init
2025-12-09 13:08:38 +08:00

301 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import Blueprint, request, jsonify, make_response
from flask_jwt_extended import get_jwt_identity
from models import db, Task, User
from auth import login_required, admin_required
from datetime import datetime, timedelta
from sqlalchemy import func, and_
import csv
from io import StringIO
statistics_bp = Blueprint('statistics', __name__)
# 个人统计
@statistics_bp.route('/statistics/user/<int:user_id>', methods=['GET'])
@login_required
def get_user_statistics(user_id):
current_user_id = int(get_jwt_identity())
current_user = db.session.get(User, current_user_id)
# 非管理员只能查看自己的统计
if current_user.role != 'admin' and current_user_id != user_id:
return jsonify({'error': '无权查看他人统计'}), 403
date_from = request.args.get('date_from')
date_to = request.args.get('date_to')
group_id = request.args.get('group_id', type=int)
query = Task.query.filter_by(claimed_by=user_id, status='completed')
if group_id:
query = query.filter_by(group_id=group_id)
if date_from:
try:
date_from_obj = datetime.strptime(date_from, '%Y-%m-%d')
query = query.filter(Task.completed_at >= date_from_obj)
except:
pass
if date_to:
try:
date_to_obj = datetime.strptime(date_to, '%Y-%m-%d')
date_to_obj = date_to_obj.replace(hour=23, minute=59, second=59)
query = query.filter(Task.completed_at <= date_to_obj)
except:
pass
# 总完成数
total_completed = query.count()
# 按日期分组统计
daily_stats = db.session.query(
func.date(Task.completed_at).label('date'),
func.count(Task.id).label('count')
).filter(
Task.claimed_by == user_id,
Task.status == 'completed'
)
if date_from:
daily_stats = daily_stats.filter(Task.completed_at >= date_from_obj)
if date_to:
daily_stats = daily_stats.filter(Task.completed_at <= date_to_obj)
daily_stats = daily_stats.group_by(func.date(Task.completed_at)).all()
# 当前认领未完成
claimed_pending = Task.query.filter_by(
claimed_by=user_id,
status='claimed'
).count()
user = db.session.get(User, user_id)
return jsonify({
'user': {
'id': user.id,
'username': user.username
},
'total_completed': total_completed,
'claimed_pending': claimed_pending,
'daily_stats': [
{
'date': str(stat.date),
'count': stat.count
} for stat in daily_stats
]
})
# 组别排行榜
@statistics_bp.route('/statistics/leaderboard', methods=['GET'])
@login_required
def get_leaderboard():
group_id = request.args.get('group_id', type=int)
period = request.args.get('period', 'monthly') # daily/monthly
now = datetime.utcnow()
if period == 'daily':
start_date = now.replace(hour=0, minute=0, second=0, microsecond=0)
else: # monthly
start_date = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
query = db.session.query(
User.id,
User.username,
func.count(Task.id).label('completed_count')
).join(
Task, Task.claimed_by == User.id
).filter(
Task.status == 'completed',
Task.completed_at >= start_date
)
if group_id:
query = query.filter(Task.group_id == group_id)
leaderboard = query.group_by(User.id, User.username).order_by(
func.count(Task.id).desc()
).limit(20).all()
return jsonify({
'period': period,
'leaderboard': [
{
'rank': idx + 1,
'user_id': row.id,
'username': row.username,
'completed_count': row.completed_count
} for idx, row in enumerate(leaderboard)
]
})
# 整体统计(支持管理员和普通用户)
@statistics_bp.route('/statistics/overview', methods=['GET'])
@login_required
def get_overview():
current_user_id = get_jwt_identity()
current_user = db.session.get(User, current_user_id)
group_id = request.args.get('group_id', type=int, default=1)
# 获取时间范围参数
date_from = request.args.get('date_from')
date_to = request.args.get('date_to')
# 基础查询条件
base_filter = [Task.group_id == group_id]
# 普通用户只能看自己的数据
if current_user.role != 'admin':
base_filter.append(Task.claimed_by == current_user_id)
# 解析时间范围
date_from_obj = None
date_to_obj = None
if date_from:
try:
date_from_obj = datetime.strptime(date_from, '%Y-%m-%d')
except:
pass
if date_to:
try:
date_to_obj = datetime.strptime(date_to, '%Y-%m-%d')
date_to_obj = date_to_obj.replace(hour=23, minute=59, second=59)
except:
pass
# 各状态任务数量
status_query = db.session.query(
Task.status,
func.count(Task.id).label('count')
).filter(and_(*base_filter)).group_by(Task.status)
status_counts = status_query.all()
# 今日完成
today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
today_filter = base_filter + [
Task.status == 'completed',
Task.completed_at >= today
]
today_completed = Task.query.filter(and_(*today_filter)).count()
# 本月完成
month_start = datetime.utcnow().replace(day=1, hour=0, minute=0, second=0, microsecond=0)
month_filter = base_filter + [
Task.status == 'completed',
Task.completed_at >= month_start
]
month_completed = Task.query.filter(and_(*month_filter)).count()
# 趋势图根据时间范围或默认最近7天
if date_from_obj:
trend_start = date_from_obj
else:
trend_start = datetime.utcnow() - timedelta(days=7)
trend_filter = base_filter + [
Task.status == 'completed',
Task.completed_at >= trend_start
]
if date_to_obj:
trend_filter.append(Task.completed_at <= date_to_obj)
trend = db.session.query(
func.date(Task.completed_at).label('date'),
func.count(Task.id).label('count')
).filter(and_(*trend_filter)).group_by(func.date(Task.completed_at)).all()
# 计算时间范围内的总完成数
completed_filter = base_filter + [Task.status == 'completed']
if date_from_obj:
completed_filter.append(Task.completed_at >= date_from_obj)
if date_to_obj:
completed_filter.append(Task.completed_at <= date_to_obj)
total_completed = Task.query.filter(and_(*completed_filter)).count()
# 当前认领未完成数
claimed_filter = base_filter + [Task.status == 'claimed']
claimed_pending = Task.query.filter(and_(*claimed_filter)).count()
return jsonify({
'status_counts': {row.status: row.count for row in status_counts},
'today_completed': today_completed,
'month_completed': month_completed,
'total_completed': total_completed,
'claimed_pending': claimed_pending,
'is_admin': current_user.role == 'admin',
'trend': [
{'date': str(row.date), 'count': row.count} for row in trend
]
})
# 导出统计数据CSV格式
@statistics_bp.route('/statistics/export', methods=['GET'])
@login_required
def export_statistics():
current_user_id = get_jwt_identity()
current_user = db.session.get(User, current_user_id)
group_id = request.args.get('group_id', type=int, default=1)
date_from = request.args.get('date_from')
date_to = request.args.get('date_to')
# 构建查询条件
query = Task.query.filter_by(group_id=group_id, status='completed')
# 普通用户只能导出自己的数据
if current_user.role != 'admin':
query = query.filter_by(claimed_by=current_user_id)
# 时间范围筛选
if date_from:
try:
date_from_obj = datetime.strptime(date_from, '%Y-%m-%d')
query = query.filter(Task.completed_at >= date_from_obj)
except:
pass
if date_to:
try:
date_to_obj = datetime.strptime(date_to, '%Y-%m-%d')
date_to_obj = date_to_obj.replace(hour=23, minute=59, second=59)
query = query.filter(Task.completed_at <= date_to_obj)
except:
pass
tasks = query.order_by(Task.completed_at.desc()).all()
# 生成CSV
output = StringIO()
writer = csv.writer(output)
# 写入表头
writer.writerow(['任务ID', '剧集名称', '优先级', '认领人', '种子ID', '完成时间', '认领时间', '耗时(小时)'])
# 写入数据
for task in tasks:
duration = ''
if task.claimed_at and task.completed_at:
delta = task.completed_at - task.claimed_at
duration = f'{delta.total_seconds() / 3600:.1f}'
claimer_name = task.claimer.username if task.claimer else '-'
writer.writerow([
task.id,
task.series_name,
task.priority or '-',
claimer_name,
task.torrent_id or '-',
task.completed_at.strftime('%Y-%m-%d %H:%M:%S') if task.completed_at else '-',
task.claimed_at.strftime('%Y-%m-%d %H:%M:%S') if task.claimed_at else '-',
duration
])
# 创建响应
response = make_response(output.getvalue())
response.headers['Content-Type'] = 'text/csv; charset=utf-8-sig'
response.headers['Content-Disposition'] = f'attachment; filename=statistics_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
return response