Files
worklist/backend/routes.py
2025-12-30 09:03:29 +00:00

409 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Blueprint, request, jsonify
from datetime import datetime, timedelta
from models import db, Task, TimeRecord
from ai_service import ai_service
import json
api = Blueprint('api', __name__)
# 任务管理API
@api.route('/tasks', methods=['GET'])
def get_tasks():
"""获取所有任务"""
tasks = Task.query.order_by(Task.created_at.desc()).all()
return jsonify([task.to_dict() for task in tasks])
@api.route('/tasks', methods=['POST'])
def create_task():
"""创建新任务"""
data = request.get_json()
if not data or 'title' not in data:
return jsonify({'error': '任务标题不能为空'}), 400
task = Task(
title=data['title'],
description=data.get('description', ''),
status=data.get('status', 'pending')
)
db.session.add(task)
db.session.commit()
return jsonify(task.to_dict()), 201
@api.route('/tasks/<int:task_id>', methods=['PUT'])
def update_task(task_id):
"""更新任务"""
task = Task.query.get_or_404(task_id)
data = request.get_json()
if 'title' in data:
task.title = data['title']
if 'description' in data:
task.description = data['description']
if 'status' in data:
task.status = data['status']
task.updated_at = datetime.utcnow()
db.session.commit()
return jsonify(task.to_dict())
@api.route('/tasks/<int:task_id>', methods=['DELETE'])
def delete_task(task_id):
"""删除任务"""
task = Task.query.get_or_404(task_id)
db.session.delete(task)
db.session.commit()
return jsonify({'message': '任务删除成功'})
@api.route('/tasks/<int:task_id>/polish', methods=['POST'])
def polish_task_description(task_id):
"""AI润色任务描述"""
task = Task.query.get_or_404(task_id)
if not ai_service.is_available():
return jsonify({'error': 'AI服务不可用请检查API密钥配置'}), 500
if not task.description:
return jsonify({'error': '任务描述为空,无法润色'}), 400
polished_description = ai_service.polish_description(task.description)
task.polished_description = polished_description
task.updated_at = datetime.utcnow()
db.session.commit()
return jsonify({
'original': task.description,
'polished': polished_description
})
# 计时器API
@api.route('/timer/start', methods=['POST'])
def start_timer():
"""开始计时"""
data = request.get_json()
task_id = data.get('task_id')
if not task_id:
return jsonify({'error': '任务ID不能为空'}), 400
task = Task.query.get_or_404(task_id)
# 检查是否已有进行中的计时
active_record = TimeRecord.query.filter_by(
task_id=task_id,
end_time=None
).first()
if active_record:
return jsonify({'error': '该任务已在计时中'}), 400
# 创建新的时间记录
time_record = TimeRecord(
task_id=task_id,
start_time=datetime.utcnow()
)
db.session.add(time_record)
task.status = 'in_progress'
db.session.commit()
return jsonify(time_record.to_dict())
@api.route('/timer/stop', methods=['POST'])
def stop_timer():
"""停止计时"""
data = request.get_json()
task_id = data.get('task_id')
if not task_id:
return jsonify({'error': '任务ID不能为空'}), 400
# 查找进行中的时间记录
time_record = TimeRecord.query.filter_by(
task_id=task_id,
end_time=None
).first()
if not time_record:
return jsonify({'error': '没有找到进行中的计时'}), 400
# 结束计时
time_record.end_time = datetime.utcnow()
time_record.calculate_duration()
# 更新任务状态
task = Task.query.get(task_id)
if task:
task.status = 'pending' # 或者根据业务逻辑设置其他状态
db.session.commit()
return jsonify(time_record.to_dict())
@api.route('/timer/status/<int:task_id>', methods=['GET'])
def get_timer_status(task_id):
"""获取任务计时状态"""
active_record = TimeRecord.query.filter_by(
task_id=task_id,
end_time=None
).first()
if active_record:
return jsonify({
'is_running': True,
'start_time': active_record.start_time.isoformat(),
'duration': int((datetime.utcnow() - active_record.start_time).total_seconds())
})
else:
return jsonify({'is_running': False})
@api.route('/timer/status/batch', methods=['POST'])
def get_timer_status_batch():
"""批量获取任务计时状态"""
data = request.get_json() or {}
task_ids = data.get('task_ids', [])
if not isinstance(task_ids, list) or not task_ids:
return jsonify({'error': '任务ID列表不能为空'}), 400
# 初始化默认状态
statuses = {int(task_id): {'is_running': False} for task_id in task_ids}
active_records = TimeRecord.query.filter(
TimeRecord.task_id.in_(task_ids),
TimeRecord.end_time.is_(None)
).all()
now = datetime.utcnow()
for record in active_records:
statuses[record.task_id] = {
'is_running': True,
'start_time': record.start_time.isoformat(),
'duration': int((now - record.start_time).total_seconds())
}
return jsonify(statuses)
# 统计报表API
@api.route('/reports/daily', methods=['GET'])
def get_daily_report():
"""获取日报表"""
date_str = request.args.get('date')
if date_str:
try:
target_date = datetime.strptime(date_str, '%Y-%m-%d').date()
except ValueError:
return jsonify({'error': '日期格式错误请使用YYYY-MM-DD格式'}), 400
else:
target_date = datetime.now().date()
# 获取指定日期的所有时间记录
start_datetime = datetime.combine(target_date, datetime.min.time())
end_datetime = datetime.combine(target_date, datetime.max.time())
records = TimeRecord.query.filter(
TimeRecord.start_time >= start_datetime,
TimeRecord.start_time <= end_datetime,
TimeRecord.end_time.isnot(None)
).all()
# 按任务分组统计
task_stats = {}
total_time = 0
for record in records:
task_id = record.task_id
if task_id not in task_stats:
task = Task.query.get(task_id)
task_stats[task_id] = {
'task': task.to_dict() if task else None,
'total_duration': 0,
'records': []
}
task_stats[task_id]['total_duration'] += record.duration or 0
task_stats[task_id]['records'].append(record.to_dict())
total_time += record.duration or 0
return jsonify({
'date': target_date.isoformat(),
'total_time': total_time,
'task_stats': list(task_stats.values())
})
@api.route('/reports/summary', methods=['GET'])
def get_summary_report():
"""获取汇总报表"""
days = int(request.args.get('days', 7)) # 默认最近7天
end_date = datetime.now().date()
start_date = end_date - timedelta(days=days-1)
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
# 获取时间范围内的记录
records = TimeRecord.query.filter(
TimeRecord.start_time >= start_datetime,
TimeRecord.start_time <= end_datetime,
TimeRecord.end_time.isnot(None)
).all()
# 按日期分组
daily_stats = {}
for record in records:
date_key = record.start_time.date().isoformat()
if date_key not in daily_stats:
daily_stats[date_key] = {
'date': date_key,
'total_time': 0,
'tasks': {}
}
task_id = record.task_id
if task_id not in daily_stats[date_key]['tasks']:
task = Task.query.get(task_id)
daily_stats[date_key]['tasks'][task_id] = {
'task_title': task.title if task else f'任务{task_id}',
'total_time': 0
}
duration = record.duration or 0
daily_stats[date_key]['total_time'] += duration
daily_stats[date_key]['tasks'][task_id]['total_time'] += duration
return jsonify({
'period': f'{start_date.isoformat()}{end_date.isoformat()}',
'daily_stats': list(daily_stats.values())
})
# 时间段历史API
@api.route('/tasks/<int:task_id>/time-history', methods=['GET'])
def get_task_time_history(task_id):
"""获取任务的时间段历史"""
task = Task.query.get_or_404(task_id)
# 获取参数
days = int(request.args.get('days', 30)) # 默认最近30天
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 20))
# 计算日期范围
end_date = datetime.now().date()
start_date = end_date - timedelta(days=days-1)
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
# 查询时间记录
query = TimeRecord.query.filter(
TimeRecord.task_id == task_id,
TimeRecord.start_time >= start_datetime,
TimeRecord.start_time <= end_datetime,
TimeRecord.end_time.isnot(None)
).order_by(TimeRecord.start_time.desc())
# 分页
pagination = query.paginate(
page=page,
per_page=per_page,
error_out=False
)
# 按日期分组
daily_segments = {}
for record in pagination.items:
date_key = record.start_time.date().isoformat()
if date_key not in daily_segments:
daily_segments[date_key] = {
'date': date_key,
'total_duration': 0,
'segments': []
}
daily_segments[date_key]['total_duration'] += record.duration or 0
daily_segments[date_key]['segments'].append(record.to_dict())
return jsonify({
'task': task.to_dict(),
'period': f'{start_date.isoformat()}{end_date.isoformat()}',
'daily_segments': list(daily_segments.values()),
'pagination': {
'page': pagination.page,
'pages': pagination.pages,
'per_page': pagination.per_page,
'total': pagination.total,
'has_next': pagination.has_next,
'has_prev': pagination.has_prev
}
})
@api.route('/time-history', methods=['GET'])
def get_all_time_history():
"""获取所有任务的时间段历史"""
# 获取参数
days = int(request.args.get('days', 7)) # 默认最近7天
task_id = request.args.get('task_id') # 可选的任务ID过滤
# 计算日期范围
end_date = datetime.now().date()
start_date = end_date - timedelta(days=days-1)
start_datetime = datetime.combine(start_date, datetime.min.time())
end_datetime = datetime.combine(end_date, datetime.max.time())
# 构建查询
query = TimeRecord.query.filter(
TimeRecord.start_time >= start_datetime,
TimeRecord.start_time <= end_datetime,
TimeRecord.end_time.isnot(None)
)
if task_id:
query = query.filter(TimeRecord.task_id == task_id)
# 按开始时间排序
records = query.order_by(TimeRecord.start_time.desc()).all()
# 按日期和任务分组
daily_tasks = {}
for record in records:
date_key = record.start_time.date().isoformat()
task_id = record.task_id
if date_key not in daily_tasks:
daily_tasks[date_key] = {}
if task_id not in daily_tasks[date_key]:
task = Task.query.get(task_id)
daily_tasks[date_key][task_id] = {
'task': task.to_dict() if task else None,
'total_duration': 0,
'segments': []
}
daily_tasks[date_key][task_id]['total_duration'] += record.duration or 0
daily_tasks[date_key][task_id]['segments'].append(record.to_dict())
# 转换为列表格式
result = []
for date, tasks in daily_tasks.items():
day_data = {
'date': date,
'total_time': sum(task['total_duration'] for task in tasks.values()),
'tasks': list(tasks.values())
}
result.append(day_data)
# 按日期排序(最新的在前)
result.sort(key=lambda x: x['date'], reverse=True)
return jsonify({
'period': f'{start_date.isoformat()}{end_date.isoformat()}',
'daily_tasks': result
})