fix(api): restrict project and task queries to current user

- Filter projects endpoint to return only projects owned by current user
- Filter tasks endpoint to return only tasks from current user's projects
- Update statistics endpoint to calculate stats scoped to current user's data
- Add filtering logic for projects, tasks, and issues based on user ownership
- Improve data isolation and security by preventing cross-user data access
This commit is contained in:
lintsinghua 2025-11-28 01:11:21 +08:00
parent 5676211b20
commit b733181663
2 changed files with 27 additions and 6 deletions

View File

@ -102,9 +102,11 @@ async def read_projects(
current_user: User = Depends(deps.get_current_user), current_user: User = Depends(deps.get_current_user),
) -> Any: ) -> Any:
""" """
Retrieve projects. Retrieve projects for current user.
""" """
query = select(Project).options(selectinload(Project.owner)) query = select(Project).options(selectinload(Project.owner))
# 只返回当前用户的项目
query = query.where(Project.owner_id == current_user.id)
if not include_deleted: if not include_deleted:
query = query.where(Project.is_active == True) query = query.where(Project.is_active == True)
query = query.order_by(Project.created_at.desc()).offset(skip).limit(limit) query = query.order_by(Project.created_at.desc()).offset(skip).limit(limit)
@ -134,15 +136,26 @@ async def get_stats(
current_user: User = Depends(deps.get_current_user), current_user: User = Depends(deps.get_current_user),
) -> Any: ) -> Any:
""" """
Get overall statistics. Get statistics for current user.
""" """
projects_result = await db.execute(select(Project)) # 只统计当前用户的项目
projects_result = await db.execute(
select(Project).where(Project.owner_id == current_user.id)
)
projects = projects_result.scalars().all() projects = projects_result.scalars().all()
project_ids = [p.id for p in projects]
tasks_result = await db.execute(select(AuditTask)) # 只统计当前用户项目的任务
tasks_result = await db.execute(
select(AuditTask).where(AuditTask.project_id.in_(project_ids)) if project_ids else select(AuditTask).where(False)
)
tasks = tasks_result.scalars().all() tasks = tasks_result.scalars().all()
task_ids = [t.id for t in tasks]
issues_result = await db.execute(select(AuditIssue)) # 只统计当前用户任务的问题
issues_result = await db.execute(
select(AuditIssue).where(AuditIssue.task_id.in_(task_ids)) if task_ids else select(AuditIssue).where(False)
)
issues = issues_result.scalars().all() issues = issues_result.scalars().all()
return { return {

View File

@ -91,9 +91,17 @@ async def list_tasks(
current_user: User = Depends(deps.get_current_user), current_user: User = Depends(deps.get_current_user),
) -> Any: ) -> Any:
""" """
List all tasks, optionally filtered by project. List tasks for current user's projects.
""" """
# 先获取当前用户的项目ID列表
projects_result = await db.execute(
select(Project.id).where(Project.owner_id == current_user.id)
)
user_project_ids = [p[0] for p in projects_result.fetchall()]
query = select(AuditTask).options(selectinload(AuditTask.project)) query = select(AuditTask).options(selectinload(AuditTask.project))
# 只返回当前用户项目的任务
query = query.where(AuditTask.project_id.in_(user_project_ids)) if user_project_ids else query.where(False)
if project_id: if project_id:
query = query.where(AuditTask.project_id == project_id) query = query.where(AuditTask.project_id == project_id)
query = query.order_by(AuditTask.created_at.desc()) query = query.order_by(AuditTask.created_at.desc())