feat(prompts-rules): add prompt templates and audit rules management

- Add database migration (004) to create prompt_templates, audit_rule_sets, and audit_rules tables with proper indexes
- Create PromptTemplate and AuditRule models with relationships and validation
- Implement prompt template API endpoints for CRUD operations and testing
- Implement audit rules API endpoints for CRUD operations and rule set management
- Add prompt and rules schemas for request/response validation
- Create prompt template initialization service with default system templates
- Integrate LLM service with prompt template system for dynamic prompt selection
- Add frontend pages for PromptManager and AuditRules management
- Add API client utilities for prompts and rules endpoints
- Update API router to include new prompts and rules endpoints
- Update database initialization to seed default templates and rules
- Update sidebar navigation to include new management pages
- Update frontend routes to support new prompt and rules management pages
This commit is contained in:
lintsinghua 2025-12-09 21:42:00 +08:00
parent 4f0e8a2982
commit 357b9cc0a7
18 changed files with 3826 additions and 2 deletions

View File

@ -0,0 +1,100 @@
"""add prompts and rules tables
Revision ID: 004_add_prompts_and_rules
Revises: add_source_type_001
Create Date: 2024-01-01 00:00:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '004_add_prompts_and_rules'
down_revision = 'add_source_type_001'
branch_labels = None
depends_on = None
def upgrade() -> None:
# 创建提示词模板表
op.create_table(
'prompt_templates',
sa.Column('id', sa.String(), nullable=False),
sa.Column('name', sa.String(100), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('template_type', sa.String(50), nullable=True, default='system'),
sa.Column('content_zh', sa.Text(), nullable=True),
sa.Column('content_en', sa.Text(), nullable=True),
sa.Column('variables', sa.Text(), nullable=True, default='{}'),
sa.Column('is_default', sa.Boolean(), nullable=True, default=False),
sa.Column('is_system', sa.Boolean(), nullable=True, default=False),
sa.Column('is_active', sa.Boolean(), nullable=True, default=True),
sa.Column('sort_order', sa.Integer(), nullable=True, default=0),
sa.Column('created_by', sa.String(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(['created_by'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
# 创建审计规则集表
op.create_table(
'audit_rule_sets',
sa.Column('id', sa.String(), nullable=False),
sa.Column('name', sa.String(100), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('language', sa.String(50), nullable=True, default='all'),
sa.Column('rule_type', sa.String(50), nullable=True, default='custom'),
sa.Column('severity_weights', sa.Text(), nullable=True),
sa.Column('is_default', sa.Boolean(), nullable=True, default=False),
sa.Column('is_system', sa.Boolean(), nullable=True, default=False),
sa.Column('is_active', sa.Boolean(), nullable=True, default=True),
sa.Column('sort_order', sa.Integer(), nullable=True, default=0),
sa.Column('created_by', sa.String(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(['created_by'], ['users.id'], ),
sa.PrimaryKeyConstraint('id')
)
# 创建审计规则表
op.create_table(
'audit_rules',
sa.Column('id', sa.String(), nullable=False),
sa.Column('rule_set_id', sa.String(), nullable=False),
sa.Column('rule_code', sa.String(50), nullable=False),
sa.Column('name', sa.String(200), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('category', sa.String(50), nullable=False),
sa.Column('severity', sa.String(20), nullable=True, default='medium'),
sa.Column('custom_prompt', sa.Text(), nullable=True),
sa.Column('fix_suggestion', sa.Text(), nullable=True),
sa.Column('reference_url', sa.String(500), nullable=True),
sa.Column('enabled', sa.Boolean(), nullable=True, default=True),
sa.Column('sort_order', sa.Integer(), nullable=True, default=0),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=True),
sa.Column('updated_at', sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(['rule_set_id'], ['audit_rule_sets.id'], ),
sa.PrimaryKeyConstraint('id')
)
# 创建索引
op.create_index('ix_prompt_templates_template_type', 'prompt_templates', ['template_type'])
op.create_index('ix_prompt_templates_is_system', 'prompt_templates', ['is_system'])
op.create_index('ix_audit_rule_sets_language', 'audit_rule_sets', ['language'])
op.create_index('ix_audit_rule_sets_rule_type', 'audit_rule_sets', ['rule_type'])
op.create_index('ix_audit_rules_category', 'audit_rules', ['category'])
op.create_index('ix_audit_rules_rule_code', 'audit_rules', ['rule_code'])
def downgrade() -> None:
op.drop_index('ix_audit_rules_rule_code', 'audit_rules')
op.drop_index('ix_audit_rules_category', 'audit_rules')
op.drop_index('ix_audit_rule_sets_rule_type', 'audit_rule_sets')
op.drop_index('ix_audit_rule_sets_language', 'audit_rule_sets')
op.drop_index('ix_prompt_templates_is_system', 'prompt_templates')
op.drop_index('ix_prompt_templates_template_type', 'prompt_templates')
op.drop_table('audit_rules')
op.drop_table('audit_rule_sets')
op.drop_table('prompt_templates')

View File

@ -1,5 +1,5 @@
from fastapi import APIRouter
from app.api.v1.endpoints import auth, users, projects, tasks, scan, members, config, database
from app.api.v1.endpoints import auth, users, projects, tasks, scan, members, config, database, prompts, rules
api_router = APIRouter()
api_router.include_router(auth.router, prefix="/auth", tags=["auth"])
@ -10,3 +10,5 @@ api_router.include_router(tasks.router, prefix="/tasks", tags=["tasks"])
api_router.include_router(scan.router, prefix="/scan", tags=["scan"])
api_router.include_router(config.router, prefix="/config", tags=["config"])
api_router.include_router(database.router, prefix="/database", tags=["database"])
api_router.include_router(prompts.router, prefix="/prompts", tags=["prompts"])
api_router.include_router(rules.router, prefix="/rules", tags=["rules"])

View File

@ -0,0 +1,351 @@
"""
提示词模板 API 端点
"""
import json
import time
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy import func as sql_func
from app.api import deps
from app.db.session import get_db
from app.models.prompt_template import PromptTemplate
from app.models.user import User
from app.schemas.prompt_template import (
PromptTemplateCreate,
PromptTemplateUpdate,
PromptTemplateResponse,
PromptTemplateListResponse,
PromptTestRequest,
PromptTestResponse,
)
router = APIRouter()
@router.get("", response_model=PromptTemplateListResponse)
async def list_prompt_templates(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100),
template_type: Optional[str] = Query(None, description="模板类型过滤"),
is_active: Optional[bool] = Query(None, description="是否启用"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(deps.get_current_user),
) -> Any:
"""获取提示词模板列表"""
query = select(PromptTemplate)
# 过滤条件:系统模板 + 当前用户创建的模板
query = query.where(
(PromptTemplate.is_system == True) |
(PromptTemplate.created_by == current_user.id)
)
if template_type:
query = query.where(PromptTemplate.template_type == template_type)
if is_active is not None:
query = query.where(PromptTemplate.is_active == is_active)
# 排序:系统模板优先,然后按排序权重和创建时间
query = query.order_by(
PromptTemplate.is_system.desc(),
PromptTemplate.is_default.desc(),
PromptTemplate.sort_order.asc(),
PromptTemplate.created_at.desc()
)
# 计数
count_query = select(sql_func.count()).select_from(query.subquery())
total = (await db.execute(count_query)).scalar()
# 分页
query = query.offset(skip).limit(limit)
result = await db.execute(query)
templates = result.scalars().all()
items = []
for t in templates:
variables = {}
if t.variables:
try:
variables = json.loads(t.variables)
except:
pass
items.append(PromptTemplateResponse(
id=t.id,
name=t.name,
description=t.description,
template_type=t.template_type,
content_zh=t.content_zh,
content_en=t.content_en,
variables=variables,
is_default=t.is_default,
is_system=t.is_system,
is_active=t.is_active,
sort_order=t.sort_order,
created_by=t.created_by,
created_at=t.created_at,
updated_at=t.updated_at,
))
return PromptTemplateListResponse(items=items, total=total)
@router.get("/{template_id}", response_model=PromptTemplateResponse)
async def get_prompt_template(
template_id: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(deps.get_current_user),
) -> Any:
"""获取单个提示词模板"""
result = await db.execute(
select(PromptTemplate).where(PromptTemplate.id == template_id)
)
template = result.scalar_one_or_none()
if not template:
raise HTTPException(status_code=404, detail="模板不存在")
# 检查权限
if not template.is_system and template.created_by != current_user.id:
raise HTTPException(status_code=403, detail="无权访问此模板")
variables = {}
if template.variables:
try:
variables = json.loads(template.variables)
except:
pass
return PromptTemplateResponse(
id=template.id,
name=template.name,
description=template.description,
template_type=template.template_type,
content_zh=template.content_zh,
content_en=template.content_en,
variables=variables,
is_default=template.is_default,
is_system=template.is_system,
is_active=template.is_active,
sort_order=template.sort_order,
created_by=template.created_by,
created_at=template.created_at,
updated_at=template.updated_at,
)
@router.post("", response_model=PromptTemplateResponse)
async def create_prompt_template(
template_in: PromptTemplateCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(deps.get_current_user),
) -> Any:
"""创建提示词模板"""
template = PromptTemplate(
name=template_in.name,
description=template_in.description,
template_type=template_in.template_type,
content_zh=template_in.content_zh,
content_en=template_in.content_en,
variables=json.dumps(template_in.variables or {}),
is_active=template_in.is_active,
sort_order=template_in.sort_order,
is_system=False,
is_default=False,
created_by=current_user.id,
)
db.add(template)
await db.commit()
await db.refresh(template)
return PromptTemplateResponse(
id=template.id,
name=template.name,
description=template.description,
template_type=template.template_type,
content_zh=template.content_zh,
content_en=template.content_en,
variables=template_in.variables or {},
is_default=template.is_default,
is_system=template.is_system,
is_active=template.is_active,
sort_order=template.sort_order,
created_by=template.created_by,
created_at=template.created_at,
updated_at=template.updated_at,
)
@router.put("/{template_id}", response_model=PromptTemplateResponse)
async def update_prompt_template(
template_id: str,
template_in: PromptTemplateUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(deps.get_current_user),
) -> Any:
"""更新提示词模板"""
result = await db.execute(
select(PromptTemplate).where(PromptTemplate.id == template_id)
)
template = result.scalar_one_or_none()
if not template:
raise HTTPException(status_code=404, detail="模板不存在")
# 系统模板不允许修改核心内容,只能修改启用状态
if template.is_system:
if template_in.is_active is not None:
template.is_active = template_in.is_active
else:
raise HTTPException(status_code=403, detail="系统模板不允许修改")
else:
# 检查权限
if template.created_by != current_user.id:
raise HTTPException(status_code=403, detail="无权修改此模板")
# 更新字段
update_data = template_in.dict(exclude_unset=True)
for field, value in update_data.items():
if field == "variables" and value is not None:
setattr(template, field, json.dumps(value))
elif field != "is_default": # 不允许用户设置默认
setattr(template, field, value)
await db.commit()
await db.refresh(template)
variables = {}
if template.variables:
try:
variables = json.loads(template.variables)
except:
pass
return PromptTemplateResponse(
id=template.id,
name=template.name,
description=template.description,
template_type=template.template_type,
content_zh=template.content_zh,
content_en=template.content_en,
variables=variables,
is_default=template.is_default,
is_system=template.is_system,
is_active=template.is_active,
sort_order=template.sort_order,
created_by=template.created_by,
created_at=template.created_at,
updated_at=template.updated_at,
)
@router.delete("/{template_id}")
async def delete_prompt_template(
template_id: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(deps.get_current_user),
) -> Any:
"""删除提示词模板"""
result = await db.execute(
select(PromptTemplate).where(PromptTemplate.id == template_id)
)
template = result.scalar_one_or_none()
if not template:
raise HTTPException(status_code=404, detail="模板不存在")
if template.is_system:
raise HTTPException(status_code=403, detail="系统模板不允许删除")
if template.created_by != current_user.id:
raise HTTPException(status_code=403, detail="无权删除此模板")
await db.delete(template)
await db.commit()
return {"message": "模板已删除"}
@router.post("/test", response_model=PromptTestResponse)
async def test_prompt_template(
request: PromptTestRequest,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(deps.get_current_user),
) -> Any:
"""测试提示词效果"""
from app.services.llm.service import LLMService
start_time = time.time()
try:
# 创建LLM服务实例
llm_service = LLMService()
# 使用自定义提示词进行分析
result = await llm_service.analyze_code_with_custom_prompt(
code=request.code,
language=request.language,
custom_prompt=request.content,
)
execution_time = time.time() - start_time
return PromptTestResponse(
success=True,
result=result,
execution_time=round(execution_time, 2),
)
except Exception as e:
execution_time = time.time() - start_time
return PromptTestResponse(
success=False,
error=str(e),
execution_time=round(execution_time, 2),
)
@router.post("/{template_id}/set-default")
async def set_default_template(
template_id: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(deps.get_current_user),
) -> Any:
"""设置默认模板(仅管理员)"""
if not current_user.is_superuser:
raise HTTPException(status_code=403, detail="仅管理员可设置默认模板")
result = await db.execute(
select(PromptTemplate).where(PromptTemplate.id == template_id)
)
template = result.scalar_one_or_none()
if not template:
raise HTTPException(status_code=404, detail="模板不存在")
# 取消同类型的其他默认模板
await db.execute(
select(PromptTemplate)
.where(PromptTemplate.template_type == template.template_type)
.where(PromptTemplate.is_default == True)
)
same_type_defaults = (await db.execute(
select(PromptTemplate)
.where(PromptTemplate.template_type == template.template_type)
.where(PromptTemplate.is_default == True)
)).scalars().all()
for t in same_type_defaults:
t.is_default = False
# 设置新的默认模板
template.is_default = True
await db.commit()
return {"message": "已设置为默认模板"}

View File

@ -0,0 +1,728 @@
"""
审计规则 API 端点
"""
import json
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import JSONResponse
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy import func as sql_func
from sqlalchemy.orm import selectinload
from app.api import deps
from app.db.session import get_db
from app.models.audit_rule import AuditRuleSet, AuditRule
from app.models.user import User
from app.schemas.audit_rule import (
AuditRuleCreate,
AuditRuleUpdate,
AuditRuleResponse,
AuditRuleSetCreate,
AuditRuleSetUpdate,
AuditRuleSetResponse,
AuditRuleSetListResponse,
AuditRuleSetExport,
AuditRuleSetImport,
)
router = APIRouter()
# ==================== 规则集 API ====================
@router.get("", response_model=AuditRuleSetListResponse)
async def list_rule_sets(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100),
language: Optional[str] = Query(None, description="语言过滤"),
rule_type: Optional[str] = Query(None, description="类型过滤"),
is_active: Optional[bool] = Query(None, description="是否启用"),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(deps.get_current_user),
) -> Any:
"""获取审计规则集列表"""
query = select(AuditRuleSet).options(selectinload(AuditRuleSet.rules))
# 过滤条件:系统规则集 + 当前用户创建的规则集
query = query.where(
(AuditRuleSet.is_system == True) |
(AuditRuleSet.created_by == current_user.id)
)
if language:
query = query.where(AuditRuleSet.language == language)
if rule_type:
query = query.where(AuditRuleSet.rule_type == rule_type)
if is_active is not None:
query = query.where(AuditRuleSet.is_active == is_active)
# 排序
query = query.order_by(
AuditRuleSet.is_system.desc(),
AuditRuleSet.is_default.desc(),
AuditRuleSet.sort_order.asc(),
AuditRuleSet.created_at.desc()
)
# 计数
count_query = select(sql_func.count()).select_from(
select(AuditRuleSet).where(
(AuditRuleSet.is_system == True) |
(AuditRuleSet.created_by == current_user.id)
).subquery()
)
total = (await db.execute(count_query)).scalar()
# 分页
query = query.offset(skip).limit(limit)
result = await db.execute(query)
rule_sets = result.scalars().unique().all()
items = []
for rs in rule_sets:
severity_weights = {"critical": 10, "high": 5, "medium": 2, "low": 1}
if rs.severity_weights:
try:
severity_weights = json.loads(rs.severity_weights)
except:
pass
rules = [
AuditRuleResponse(
id=r.id,
rule_set_id=r.rule_set_id,
rule_code=r.rule_code,
name=r.name,
description=r.description,
category=r.category,
severity=r.severity,
custom_prompt=r.custom_prompt,
fix_suggestion=r.fix_suggestion,
reference_url=r.reference_url,
enabled=r.enabled,
sort_order=r.sort_order,
created_at=r.created_at,
updated_at=r.updated_at,
)
for r in rs.rules
]
items.append(AuditRuleSetResponse(
id=rs.id,
name=rs.name,
description=rs.description,
language=rs.language,
rule_type=rs.rule_type,
severity_weights=severity_weights,
is_default=rs.is_default,
is_system=rs.is_system,
is_active=rs.is_active,
sort_order=rs.sort_order,
created_by=rs.created_by,
created_at=rs.created_at,
updated_at=rs.updated_at,
rules=rules,
rules_count=len(rules),
enabled_rules_count=len([r for r in rules if r.enabled]),
))
return AuditRuleSetListResponse(items=items, total=total)
@router.get("/{rule_set_id}", response_model=AuditRuleSetResponse)
async def get_rule_set(
rule_set_id: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(deps.get_current_user),
) -> Any:
"""获取单个规则集"""
result = await db.execute(
select(AuditRuleSet)
.options(selectinload(AuditRuleSet.rules))
.where(AuditRuleSet.id == rule_set_id)
)
rule_set = result.scalar_one_or_none()
if not rule_set:
raise HTTPException(status_code=404, detail="规则集不存在")
if not rule_set.is_system and rule_set.created_by != current_user.id:
raise HTTPException(status_code=403, detail="无权访问此规则集")
severity_weights = {"critical": 10, "high": 5, "medium": 2, "low": 1}
if rule_set.severity_weights:
try:
severity_weights = json.loads(rule_set.severity_weights)
except:
pass
rules = [
AuditRuleResponse(
id=r.id,
rule_set_id=r.rule_set_id,
rule_code=r.rule_code,
name=r.name,
description=r.description,
category=r.category,
severity=r.severity,
custom_prompt=r.custom_prompt,
fix_suggestion=r.fix_suggestion,
reference_url=r.reference_url,
enabled=r.enabled,
sort_order=r.sort_order,
created_at=r.created_at,
updated_at=r.updated_at,
)
for r in rule_set.rules
]
return AuditRuleSetResponse(
id=rule_set.id,
name=rule_set.name,
description=rule_set.description,
language=rule_set.language,
rule_type=rule_set.rule_type,
severity_weights=severity_weights,
is_default=rule_set.is_default,
is_system=rule_set.is_system,
is_active=rule_set.is_active,
sort_order=rule_set.sort_order,
created_by=rule_set.created_by,
created_at=rule_set.created_at,
updated_at=rule_set.updated_at,
rules=rules,
rules_count=len(rules),
enabled_rules_count=len([r for r in rules if r.enabled]),
)
@router.post("", response_model=AuditRuleSetResponse)
async def create_rule_set(
rule_set_in: AuditRuleSetCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(deps.get_current_user),
) -> Any:
"""创建审计规则集"""
rule_set = AuditRuleSet(
name=rule_set_in.name,
description=rule_set_in.description,
language=rule_set_in.language,
rule_type=rule_set_in.rule_type,
severity_weights=json.dumps(rule_set_in.severity_weights or {}),
is_active=rule_set_in.is_active,
sort_order=rule_set_in.sort_order,
is_system=False,
is_default=False,
created_by=current_user.id,
)
db.add(rule_set)
await db.flush()
# 创建规则
rules = []
for rule_in in (rule_set_in.rules or []):
rule = AuditRule(
rule_set_id=rule_set.id,
rule_code=rule_in.rule_code,
name=rule_in.name,
description=rule_in.description,
category=rule_in.category,
severity=rule_in.severity,
custom_prompt=rule_in.custom_prompt,
fix_suggestion=rule_in.fix_suggestion,
reference_url=rule_in.reference_url,
enabled=rule_in.enabled,
sort_order=rule_in.sort_order,
)
db.add(rule)
rules.append(rule)
await db.commit()
await db.refresh(rule_set)
return AuditRuleSetResponse(
id=rule_set.id,
name=rule_set.name,
description=rule_set.description,
language=rule_set.language,
rule_type=rule_set.rule_type,
severity_weights=rule_set_in.severity_weights or {},
is_default=rule_set.is_default,
is_system=rule_set.is_system,
is_active=rule_set.is_active,
sort_order=rule_set.sort_order,
created_by=rule_set.created_by,
created_at=rule_set.created_at,
updated_at=rule_set.updated_at,
rules=[
AuditRuleResponse(
id=r.id,
rule_set_id=r.rule_set_id,
rule_code=r.rule_code,
name=r.name,
description=r.description,
category=r.category,
severity=r.severity,
custom_prompt=r.custom_prompt,
fix_suggestion=r.fix_suggestion,
reference_url=r.reference_url,
enabled=r.enabled,
sort_order=r.sort_order,
created_at=r.created_at,
updated_at=r.updated_at,
)
for r in rules
],
rules_count=len(rules),
enabled_rules_count=len([r for r in rules if r.enabled]),
)
@router.put("/{rule_set_id}", response_model=AuditRuleSetResponse)
async def update_rule_set(
rule_set_id: str,
rule_set_in: AuditRuleSetUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(deps.get_current_user),
) -> Any:
"""更新审计规则集"""
result = await db.execute(
select(AuditRuleSet)
.options(selectinload(AuditRuleSet.rules))
.where(AuditRuleSet.id == rule_set_id)
)
rule_set = result.scalar_one_or_none()
if not rule_set:
raise HTTPException(status_code=404, detail="规则集不存在")
if rule_set.is_system:
# 系统规则集只能修改启用状态
if rule_set_in.is_active is not None:
rule_set.is_active = rule_set_in.is_active
else:
raise HTTPException(status_code=403, detail="系统规则集不允许修改")
else:
if rule_set.created_by != current_user.id:
raise HTTPException(status_code=403, detail="无权修改此规则集")
update_data = rule_set_in.dict(exclude_unset=True)
for field, value in update_data.items():
if field == "severity_weights" and value is not None:
setattr(rule_set, field, json.dumps(value))
elif field != "is_default":
setattr(rule_set, field, value)
await db.commit()
await db.refresh(rule_set)
severity_weights = {"critical": 10, "high": 5, "medium": 2, "low": 1}
if rule_set.severity_weights:
try:
severity_weights = json.loads(rule_set.severity_weights)
except:
pass
rules = [
AuditRuleResponse(
id=r.id,
rule_set_id=r.rule_set_id,
rule_code=r.rule_code,
name=r.name,
description=r.description,
category=r.category,
severity=r.severity,
custom_prompt=r.custom_prompt,
fix_suggestion=r.fix_suggestion,
reference_url=r.reference_url,
enabled=r.enabled,
sort_order=r.sort_order,
created_at=r.created_at,
updated_at=r.updated_at,
)
for r in rule_set.rules
]
return AuditRuleSetResponse(
id=rule_set.id,
name=rule_set.name,
description=rule_set.description,
language=rule_set.language,
rule_type=rule_set.rule_type,
severity_weights=severity_weights,
is_default=rule_set.is_default,
is_system=rule_set.is_system,
is_active=rule_set.is_active,
sort_order=rule_set.sort_order,
created_by=rule_set.created_by,
created_at=rule_set.created_at,
updated_at=rule_set.updated_at,
rules=rules,
rules_count=len(rules),
enabled_rules_count=len([r for r in rules if r.enabled]),
)
@router.delete("/{rule_set_id}")
async def delete_rule_set(
rule_set_id: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(deps.get_current_user),
) -> Any:
"""删除审计规则集"""
result = await db.execute(
select(AuditRuleSet).where(AuditRuleSet.id == rule_set_id)
)
rule_set = result.scalar_one_or_none()
if not rule_set:
raise HTTPException(status_code=404, detail="规则集不存在")
if rule_set.is_system:
raise HTTPException(status_code=403, detail="系统规则集不允许删除")
if rule_set.created_by != current_user.id:
raise HTTPException(status_code=403, detail="无权删除此规则集")
await db.delete(rule_set)
await db.commit()
return {"message": "规则集已删除"}
@router.get("/{rule_set_id}/export")
async def export_rule_set(
rule_set_id: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(deps.get_current_user),
) -> Any:
"""导出规则集为JSON"""
result = await db.execute(
select(AuditRuleSet)
.options(selectinload(AuditRuleSet.rules))
.where(AuditRuleSet.id == rule_set_id)
)
rule_set = result.scalar_one_or_none()
if not rule_set:
raise HTTPException(status_code=404, detail="规则集不存在")
if not rule_set.is_system and rule_set.created_by != current_user.id:
raise HTTPException(status_code=403, detail="无权导出此规则集")
severity_weights = {"critical": 10, "high": 5, "medium": 2, "low": 1}
if rule_set.severity_weights:
try:
severity_weights = json.loads(rule_set.severity_weights)
except:
pass
export_data = {
"name": rule_set.name,
"description": rule_set.description,
"language": rule_set.language,
"rule_type": rule_set.rule_type,
"severity_weights": severity_weights,
"rules": [
{
"rule_code": r.rule_code,
"name": r.name,
"description": r.description,
"category": r.category,
"severity": r.severity,
"custom_prompt": r.custom_prompt,
"fix_suggestion": r.fix_suggestion,
"reference_url": r.reference_url,
"enabled": r.enabled,
"sort_order": r.sort_order,
}
for r in rule_set.rules
],
"export_version": "1.0",
}
return JSONResponse(
content=export_data,
headers={
"Content-Disposition": f'attachment; filename="{rule_set.name}.json"'
}
)
@router.post("/import", response_model=AuditRuleSetResponse)
async def import_rule_set(
import_data: AuditRuleSetImport,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(deps.get_current_user),
) -> Any:
"""导入规则集"""
rule_set = AuditRuleSet(
name=import_data.name,
description=import_data.description,
language=import_data.language,
rule_type=import_data.rule_type,
severity_weights=json.dumps(import_data.severity_weights or {}),
is_active=True,
is_system=False,
is_default=False,
created_by=current_user.id,
)
db.add(rule_set)
await db.flush()
rules = []
for rule_in in import_data.rules:
rule = AuditRule(
rule_set_id=rule_set.id,
rule_code=rule_in.rule_code,
name=rule_in.name,
description=rule_in.description,
category=rule_in.category,
severity=rule_in.severity,
custom_prompt=rule_in.custom_prompt,
fix_suggestion=rule_in.fix_suggestion,
reference_url=rule_in.reference_url,
enabled=rule_in.enabled,
sort_order=rule_in.sort_order,
)
db.add(rule)
rules.append(rule)
await db.commit()
await db.refresh(rule_set)
return AuditRuleSetResponse(
id=rule_set.id,
name=rule_set.name,
description=rule_set.description,
language=rule_set.language,
rule_type=rule_set.rule_type,
severity_weights=import_data.severity_weights or {},
is_default=rule_set.is_default,
is_system=rule_set.is_system,
is_active=rule_set.is_active,
sort_order=rule_set.sort_order,
created_by=rule_set.created_by,
created_at=rule_set.created_at,
updated_at=rule_set.updated_at,
rules=[
AuditRuleResponse(
id=r.id,
rule_set_id=r.rule_set_id,
rule_code=r.rule_code,
name=r.name,
description=r.description,
category=r.category,
severity=r.severity,
custom_prompt=r.custom_prompt,
fix_suggestion=r.fix_suggestion,
reference_url=r.reference_url,
enabled=r.enabled,
sort_order=r.sort_order,
created_at=r.created_at,
updated_at=r.updated_at,
)
for r in rules
],
rules_count=len(rules),
enabled_rules_count=len([r for r in rules if r.enabled]),
)
# ==================== 单个规则 API ====================
@router.post("/{rule_set_id}/rules", response_model=AuditRuleResponse)
async def add_rule_to_set(
rule_set_id: str,
rule_in: AuditRuleCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(deps.get_current_user),
) -> Any:
"""向规则集添加规则"""
result = await db.execute(
select(AuditRuleSet).where(AuditRuleSet.id == rule_set_id)
)
rule_set = result.scalar_one_or_none()
if not rule_set:
raise HTTPException(status_code=404, detail="规则集不存在")
if rule_set.is_system:
raise HTTPException(status_code=403, detail="系统规则集不允许添加规则")
if rule_set.created_by != current_user.id:
raise HTTPException(status_code=403, detail="无权修改此规则集")
rule = AuditRule(
rule_set_id=rule_set_id,
rule_code=rule_in.rule_code,
name=rule_in.name,
description=rule_in.description,
category=rule_in.category,
severity=rule_in.severity,
custom_prompt=rule_in.custom_prompt,
fix_suggestion=rule_in.fix_suggestion,
reference_url=rule_in.reference_url,
enabled=rule_in.enabled,
sort_order=rule_in.sort_order,
)
db.add(rule)
await db.commit()
await db.refresh(rule)
return AuditRuleResponse(
id=rule.id,
rule_set_id=rule.rule_set_id,
rule_code=rule.rule_code,
name=rule.name,
description=rule.description,
category=rule.category,
severity=rule.severity,
custom_prompt=rule.custom_prompt,
fix_suggestion=rule.fix_suggestion,
reference_url=rule.reference_url,
enabled=rule.enabled,
sort_order=rule.sort_order,
created_at=rule.created_at,
updated_at=rule.updated_at,
)
@router.put("/{rule_set_id}/rules/{rule_id}", response_model=AuditRuleResponse)
async def update_rule(
rule_set_id: str,
rule_id: str,
rule_in: AuditRuleUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(deps.get_current_user),
) -> Any:
"""更新规则"""
result = await db.execute(
select(AuditRuleSet).where(AuditRuleSet.id == rule_set_id)
)
rule_set = result.scalar_one_or_none()
if not rule_set:
raise HTTPException(status_code=404, detail="规则集不存在")
if rule_set.is_system:
raise HTTPException(status_code=403, detail="系统规则集不允许修改规则")
if rule_set.created_by != current_user.id:
raise HTTPException(status_code=403, detail="无权修改此规则集")
result = await db.execute(
select(AuditRule).where(
AuditRule.id == rule_id,
AuditRule.rule_set_id == rule_set_id
)
)
rule = result.scalar_one_or_none()
if not rule:
raise HTTPException(status_code=404, detail="规则不存在")
update_data = rule_in.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(rule, field, value)
await db.commit()
await db.refresh(rule)
return AuditRuleResponse(
id=rule.id,
rule_set_id=rule.rule_set_id,
rule_code=rule.rule_code,
name=rule.name,
description=rule.description,
category=rule.category,
severity=rule.severity,
custom_prompt=rule.custom_prompt,
fix_suggestion=rule.fix_suggestion,
reference_url=rule.reference_url,
enabled=rule.enabled,
sort_order=rule.sort_order,
created_at=rule.created_at,
updated_at=rule.updated_at,
)
@router.delete("/{rule_set_id}/rules/{rule_id}")
async def delete_rule(
rule_set_id: str,
rule_id: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(deps.get_current_user),
) -> Any:
"""删除规则"""
result = await db.execute(
select(AuditRuleSet).where(AuditRuleSet.id == rule_set_id)
)
rule_set = result.scalar_one_or_none()
if not rule_set:
raise HTTPException(status_code=404, detail="规则集不存在")
if rule_set.is_system:
raise HTTPException(status_code=403, detail="系统规则集不允许删除规则")
if rule_set.created_by != current_user.id:
raise HTTPException(status_code=403, detail="无权修改此规则集")
result = await db.execute(
select(AuditRule).where(
AuditRule.id == rule_id,
AuditRule.rule_set_id == rule_set_id
)
)
rule = result.scalar_one_or_none()
if not rule:
raise HTTPException(status_code=404, detail="规则不存在")
await db.delete(rule)
await db.commit()
return {"message": "规则已删除"}
@router.put("/{rule_set_id}/rules/{rule_id}/toggle")
async def toggle_rule(
rule_set_id: str,
rule_id: str,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(deps.get_current_user),
) -> Any:
"""切换规则启用状态"""
result = await db.execute(
select(AuditRuleSet).where(AuditRuleSet.id == rule_set_id)
)
rule_set = result.scalar_one_or_none()
if not rule_set:
raise HTTPException(status_code=404, detail="规则集不存在")
# 系统规则集也允许切换单个规则的启用状态
if not rule_set.is_system and rule_set.created_by != current_user.id:
raise HTTPException(status_code=403, detail="无权修改此规则集")
result = await db.execute(
select(AuditRule).where(
AuditRule.id == rule_id,
AuditRule.rule_set_id == rule_set_id
)
)
rule = result.scalar_one_or_none()
if not rule:
raise HTTPException(status_code=404, detail="规则不存在")
rule.enabled = not rule.enabled
await db.commit()
return {"enabled": rule.enabled, "message": f"规则已{'启用' if rule.enabled else '禁用'}"}

View File

@ -273,4 +273,12 @@ async def init_db(db: AsyncSession) -> None:
await create_demo_data(db, demo_user)
await db.commit()
# 初始化系统模板和规则
try:
from app.services.init_templates import init_templates_and_rules
await init_templates_and_rules(db)
except Exception as e:
logger.warning(f"初始化模板和规则跳过: {e}")
logger.info("数据库初始化完成")

View File

@ -2,5 +2,7 @@ from .user import User
from .project import Project, ProjectMember
from .audit import AuditTask, AuditIssue
from .analysis import InstantAnalysis
from .prompt_template import PromptTemplate
from .audit_rule import AuditRuleSet, AuditRule

View File

@ -0,0 +1,90 @@
"""
审计规则模型 - 存储自定义审计规范
"""
import uuid
from sqlalchemy import Column, String, Text, DateTime, ForeignKey, Boolean, Integer, Float
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.db.base import Base
class AuditRuleSet(Base):
"""审计规则集表"""
__tablename__ = "audit_rule_sets"
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
name = Column(String(100), nullable=False) # 规则集名称
description = Column(Text, nullable=True) # 规则集描述
# 适用语言: all, python, javascript, java, go, etc.
language = Column(String(50), default="all")
# 规则集类型: security(安全), quality(质量), performance(性能), custom(自定义)
rule_type = Column(String(50), default="custom")
# 严重程度权重配置JSON格式
# {"critical": 10, "high": 5, "medium": 2, "low": 1}
severity_weights = Column(Text, default='{"critical": 10, "high": 5, "medium": 2, "low": 1}')
# 状态标记
is_default = Column(Boolean, default=False) # 是否默认规则集
is_system = Column(Boolean, default=False) # 是否系统内置
is_active = Column(Boolean, default=True) # 是否启用
# 排序权重
sort_order = Column(Integer, default=0)
# 创建者
created_by = Column(String, ForeignKey("users.id"), nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationships
creator = relationship("User", foreign_keys=[created_by])
rules = relationship("AuditRule", back_populates="rule_set", cascade="all, delete-orphan")
class AuditRule(Base):
"""审计规则表"""
__tablename__ = "audit_rules"
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
rule_set_id = Column(String, ForeignKey("audit_rule_sets.id"), nullable=False)
# 规则标识(唯一标识符,如 SEC001, PERF002
rule_code = Column(String(50), nullable=False)
# 规则名称
name = Column(String(200), nullable=False)
# 规则描述
description = Column(Text, nullable=True)
# 规则类别: security, bug, performance, style, maintainability
category = Column(String(50), nullable=False)
# 默认严重程度: critical, high, medium, low
severity = Column(String(20), default="medium")
# 自定义检测提示词可选用于增强LLM检测
custom_prompt = Column(Text, nullable=True)
# 修复建议模板
fix_suggestion = Column(Text, nullable=True)
# 参考链接如CWE、OWASP链接
reference_url = Column(String(500), nullable=True)
# 是否启用
enabled = Column(Boolean, default=True)
# 排序权重
sort_order = Column(Integer, default=0)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationships
rule_set = relationship("AuditRuleSet", back_populates="rules")

View File

@ -0,0 +1,45 @@
"""
提示词模板模型 - 存储自定义审计提示词
"""
import uuid
from sqlalchemy import Column, String, Text, DateTime, ForeignKey, Boolean, Integer
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from app.db.base import Base
class PromptTemplate(Base):
"""提示词模板表"""
__tablename__ = "prompt_templates"
id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4()))
name = Column(String(100), nullable=False) # 模板名称
description = Column(Text, nullable=True) # 模板描述
# 模板类型: system(系统提示词), user(用户提示词), analysis(分析提示词)
template_type = Column(String(50), default="system")
# 提示词内容(支持中英文)
content_zh = Column(Text, nullable=True) # 中文提示词
content_en = Column(Text, nullable=True) # 英文提示词
# 模板变量说明JSON格式
variables = Column(Text, default="{}") # {"language": "编程语言", "code": "代码内容"}
# 状态标记
is_default = Column(Boolean, default=False) # 是否默认模板
is_system = Column(Boolean, default=False) # 是否系统内置(不可删除)
is_active = Column(Boolean, default=True) # 是否启用
# 排序权重
sort_order = Column(Integer, default=0)
# 创建者(系统模板为空)
created_by = Column(String, ForeignKey("users.id"), nullable=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# Relationships
creator = relationship("User", foreign_keys=[created_by])

View File

@ -0,0 +1,129 @@
"""
审计规则 Schema
"""
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field
from datetime import datetime
# ==================== 审计规则 ====================
class AuditRuleBase(BaseModel):
"""审计规则基础Schema"""
rule_code: str = Field(..., min_length=1, max_length=50, description="规则标识")
name: str = Field(..., min_length=1, max_length=200, description="规则名称")
description: Optional[str] = Field(None, description="规则描述")
category: str = Field(..., description="规则类别: security/bug/performance/style/maintainability")
severity: str = Field("medium", description="严重程度: critical/high/medium/low")
custom_prompt: Optional[str] = Field(None, description="自定义检测提示词")
fix_suggestion: Optional[str] = Field(None, description="修复建议模板")
reference_url: Optional[str] = Field(None, max_length=500, description="参考链接")
enabled: bool = Field(True, description="是否启用")
sort_order: int = Field(0, description="排序权重")
class AuditRuleCreate(AuditRuleBase):
"""创建审计规则"""
pass
class AuditRuleUpdate(BaseModel):
"""更新审计规则"""
rule_code: Optional[str] = Field(None, min_length=1, max_length=50)
name: Optional[str] = Field(None, min_length=1, max_length=200)
description: Optional[str] = None
category: Optional[str] = None
severity: Optional[str] = None
custom_prompt: Optional[str] = None
fix_suggestion: Optional[str] = None
reference_url: Optional[str] = None
enabled: Optional[bool] = None
sort_order: Optional[int] = None
class AuditRuleResponse(AuditRuleBase):
"""审计规则响应"""
id: str
rule_set_id: str
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
# ==================== 审计规则集 ====================
class AuditRuleSetBase(BaseModel):
"""审计规则集基础Schema"""
name: str = Field(..., min_length=1, max_length=100, description="规则集名称")
description: Optional[str] = Field(None, description="规则集描述")
language: str = Field("all", description="适用语言")
rule_type: str = Field("custom", description="规则集类型: security/quality/performance/custom")
severity_weights: Optional[Dict[str, int]] = Field(
default_factory=lambda: {"critical": 10, "high": 5, "medium": 2, "low": 1},
description="严重程度权重"
)
is_active: bool = Field(True, description="是否启用")
sort_order: int = Field(0, description="排序权重")
class AuditRuleSetCreate(AuditRuleSetBase):
"""创建审计规则集"""
rules: Optional[List[AuditRuleCreate]] = Field(default_factory=list, description="规则列表")
class AuditRuleSetUpdate(BaseModel):
"""更新审计规则集"""
name: Optional[str] = Field(None, min_length=1, max_length=100)
description: Optional[str] = None
language: Optional[str] = None
rule_type: Optional[str] = None
severity_weights: Optional[Dict[str, int]] = None
is_default: Optional[bool] = None
is_active: Optional[bool] = None
sort_order: Optional[int] = None
class AuditRuleSetResponse(AuditRuleSetBase):
"""审计规则集响应"""
id: str
is_default: bool = False
is_system: bool = False
created_by: Optional[str] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
rules: List[AuditRuleResponse] = Field(default_factory=list)
rules_count: int = 0
enabled_rules_count: int = 0
class Config:
from_attributes = True
class AuditRuleSetListResponse(BaseModel):
"""审计规则集列表响应"""
items: List[AuditRuleSetResponse]
total: int
class AuditRuleSetExport(BaseModel):
"""规则集导出格式"""
name: str
description: Optional[str]
language: str
rule_type: str
severity_weights: Dict[str, int]
rules: List[AuditRuleBase]
export_version: str = "1.0"
class AuditRuleSetImport(BaseModel):
"""规则集导入格式"""
name: str
description: Optional[str] = None
language: str = "all"
rule_type: str = "custom"
severity_weights: Optional[Dict[str, int]] = None
rules: List[AuditRuleCreate]

View File

@ -0,0 +1,71 @@
"""
提示词模板 Schema
"""
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field
from datetime import datetime
class PromptTemplateBase(BaseModel):
"""提示词模板基础Schema"""
name: str = Field(..., min_length=1, max_length=100, description="模板名称")
description: Optional[str] = Field(None, description="模板描述")
template_type: str = Field("system", description="模板类型: system/user/analysis")
content_zh: Optional[str] = Field(None, description="中文提示词")
content_en: Optional[str] = Field(None, description="英文提示词")
variables: Optional[Dict[str, str]] = Field(default_factory=dict, description="模板变量说明")
is_active: bool = Field(True, description="是否启用")
sort_order: int = Field(0, description="排序权重")
class PromptTemplateCreate(PromptTemplateBase):
"""创建提示词模板"""
pass
class PromptTemplateUpdate(BaseModel):
"""更新提示词模板"""
name: Optional[str] = Field(None, min_length=1, max_length=100)
description: Optional[str] = None
template_type: Optional[str] = None
content_zh: Optional[str] = None
content_en: Optional[str] = None
variables: Optional[Dict[str, str]] = None
is_default: Optional[bool] = None
is_active: Optional[bool] = None
sort_order: Optional[int] = None
class PromptTemplateResponse(PromptTemplateBase):
"""提示词模板响应"""
id: str
is_default: bool = False
is_system: bool = False
created_by: Optional[str] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Config:
from_attributes = True
class PromptTemplateListResponse(BaseModel):
"""提示词模板列表响应"""
items: List[PromptTemplateResponse]
total: int
class PromptTestRequest(BaseModel):
"""提示词测试请求"""
content: str = Field(..., description="提示词内容")
language: str = Field("python", description="编程语言")
code: str = Field(..., description="测试代码")
class PromptTestResponse(BaseModel):
"""提示词测试响应"""
success: bool
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
execution_time: Optional[float] = None

View File

@ -0,0 +1,617 @@
"""
初始化系统预置的提示词模板和审计规则
"""
import json
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.models.prompt_template import PromptTemplate
from app.models.audit_rule import AuditRuleSet, AuditRule
logger = logging.getLogger(__name__)
# ==================== 系统提示词模板 ====================
SYSTEM_PROMPT_TEMPLATES = [
{
"name": "默认代码审计",
"description": "全面的代码审计提示词,涵盖安全、性能、代码质量等多个维度",
"template_type": "system",
"is_default": True,
"sort_order": 0,
"variables": {"language": "编程语言", "code": "代码内容"},
"content_zh": """你是一个专业的代码审计助手。请从以下维度全面分析代码:
- 安全漏洞SQL注入XSS命令注入路径遍历SSRFXXE反序列化硬编码密钥等
- 潜在的 Bug 和逻辑错误
- 性能问题和优化建议
- 编码规范和代码风格
- 可维护性和可读性
- 最佳实践和设计模式
请尽可能多地找出代码中的所有问题不要遗漏任何安全漏洞或潜在风险""",
"content_en": """You are a professional code auditing assistant. Please comprehensively analyze the code from the following dimensions:
- Security vulnerabilities (SQL injection, XSS, command injection, path traversal, SSRF, XXE, deserialization, hardcoded secrets, etc.)
- Potential bugs and logical errors
- Performance issues and optimization suggestions
- Coding standards and code style
- Maintainability and readability
- Best practices and design patterns
Find as many issues as possible! Do NOT miss any security vulnerabilities or potential risks!"""
},
{
"name": "安全专项审计",
"description": "专注于安全漏洞检测的提示词模板",
"template_type": "system",
"is_default": False,
"sort_order": 1,
"variables": {"language": "编程语言", "code": "代码内容"},
"content_zh": """你是一个专业的安全审计专家。请专注于检测以下安全问题:
注入类漏洞
- SQL注入包括盲注时间盲注联合查询注入
- 命令注入OS命令执行
- LDAP注入
- XPath注入
- NoSQL注入
跨站脚本XSS
- 反射型XSS
- 存储型XSS
- DOM型XSS
认证与授权
- 硬编码凭证
- 弱密码策略
- 会话管理问题
- 权限绕过
敏感数据
- 敏感信息泄露
- 不安全的加密
- 明文传输敏感数据
其他安全问题
- SSRF服务端请求伪造
- XXEXML外部实体注入
- 反序列化漏洞
- 路径遍历
- 文件上传漏洞
- CSRF跨站请求伪造
请详细说明每个漏洞的风险等级利用方式和修复建议""",
"content_en": """You are a professional security audit expert. Please focus on detecting the following security issues:
Injection Vulnerabilities
- SQL Injection (including blind, time-based, union-based)
- Command Injection (OS command execution)
- LDAP Injection
- XPath Injection
- NoSQL Injection
Cross-Site Scripting (XSS)
- Reflected XSS
- Stored XSS
- DOM-based XSS
Authentication & Authorization
- Hardcoded credentials
- Weak password policies
- Session management issues
- Authorization bypass
Sensitive Data
- Sensitive information disclosure
- Insecure cryptography
- Plaintext transmission of sensitive data
Other Security Issues
- SSRF (Server-Side Request Forgery)
- XXE (XML External Entity Injection)
- Deserialization vulnerabilities
- Path traversal
- File upload vulnerabilities
- CSRF (Cross-Site Request Forgery)
Please provide detailed risk level, exploitation method, and remediation suggestions for each vulnerability."""
},
{
"name": "性能优化审计",
"description": "专注于性能问题检测的提示词模板",
"template_type": "system",
"is_default": False,
"sort_order": 2,
"variables": {"language": "编程语言", "code": "代码内容"},
"content_zh": """你是一个专业的性能优化专家。请专注于检测以下性能问题:
数据库性能
- N+1查询问题
- 缺少索引
- 不必要的全表扫描
- 大量数据一次性加载
- 未使用连接池
内存问题
- 内存泄漏
- 大对象未及时释放
- 缓存使用不当
- 循环中创建大量对象
算法效率
- 时间复杂度过高
- 不必要的重复计算
- 可优化的循环
- 递归深度过大
并发问题
- 线程安全问题
- 死锁风险
- 资源竞争
- 不必要的同步
I/O性能
- 同步阻塞I/O
- 未使用缓冲
- 频繁的小文件操作
- 网络请求未优化
请提供具体的优化建议和预期的性能提升""",
"content_en": """You are a professional performance optimization expert. Please focus on detecting the following performance issues:
Database Performance
- N+1 query problems
- Missing indexes
- Unnecessary full table scans
- Loading large amounts of data at once
- Not using connection pools
Memory Issues
- Memory leaks
- Large objects not released timely
- Improper cache usage
- Creating many objects in loops
Algorithm Efficiency
- High time complexity
- Unnecessary repeated calculations
- Optimizable loops
- Excessive recursion depth
Concurrency Issues
- Thread safety problems
- Deadlock risks
- Resource contention
- Unnecessary synchronization
I/O Performance
- Synchronous blocking I/O
- Not using buffers
- Frequent small file operations
- Unoptimized network requests
Please provide specific optimization suggestions and expected performance improvements."""
},
{
"name": "代码质量审计",
"description": "专注于代码质量和可维护性的提示词模板",
"template_type": "system",
"is_default": False,
"sort_order": 3,
"variables": {"language": "编程语言", "code": "代码内容"},
"content_zh": """你是一个专业的代码质量审计专家。请专注于检测以下代码质量问题:
代码规范
- 命名不规范变量函数
- 代码格式不一致
- 注释缺失或过时
- 魔法数字/字符串
代码结构
- 函数过长超过50行
- 类职责不单一
- 嵌套层级过深
- 重复代码
可维护性
- 高耦合低内聚
- 缺少错误处理
- 硬编码配置
- 缺少日志记录
设计模式
- 违反SOLID原则
- 可使用设计模式优化的场景
- 过度设计
测试相关
- 难以测试的代码
- 缺少边界条件处理
- 依赖注入问题
请提供具体的重构建议和代码示例""",
"content_en": """You are a professional code quality audit expert. Please focus on detecting the following code quality issues:
Code Standards
- Non-standard naming (variables, functions, classes)
- Inconsistent code formatting
- Missing or outdated comments
- Magic numbers/strings
Code Structure
- Functions too long (over 50 lines)
- Classes with multiple responsibilities
- Deep nesting levels
- Duplicate code
Maintainability
- High coupling, low cohesion
- Missing error handling
- Hardcoded configurations
- Missing logging
Design Patterns
- SOLID principle violations
- Scenarios that could benefit from design patterns
- Over-engineering
Testing Related
- Hard-to-test code
- Missing boundary condition handling
- Dependency injection issues
Please provide specific refactoring suggestions and code examples."""
},
]
# ==================== 系统审计规则集 ====================
SYSTEM_RULE_SETS = [
{
"name": "OWASP Top 10",
"description": "基于 OWASP Top 10 2021 的安全审计规则集",
"language": "all",
"rule_type": "security",
"is_default": True,
"sort_order": 0,
"severity_weights": {"critical": 10, "high": 5, "medium": 2, "low": 1},
"rules": [
{
"rule_code": "A01",
"name": "访问控制失效",
"description": "检测权限绕过、越权访问、IDOR等访问控制问题",
"category": "security",
"severity": "critical",
"custom_prompt": "检查是否存在访问控制失效问题权限检查缺失、越权访问、IDOR不安全的直接对象引用、CORS配置错误",
"fix_suggestion": "实施最小权限原则,在服务端进行权限验证,使用基于角色的访问控制(RBAC)",
"reference_url": "https://owasp.org/Top10/A01_2021-Broken_Access_Control/",
},
{
"rule_code": "A02",
"name": "加密机制失效",
"description": "检测弱加密、明文传输、密钥管理不当等问题",
"category": "security",
"severity": "critical",
"custom_prompt": "检查是否存在加密问题:使用弱加密算法(MD5/SHA1/DES)、明文存储密码、硬编码密钥、不安全的随机数生成",
"fix_suggestion": "使用强加密算法(AES-256/RSA-2048),使用安全的密码哈希(bcrypt/Argon2),妥善管理密钥",
"reference_url": "https://owasp.org/Top10/A02_2021-Cryptographic_Failures/",
},
{
"rule_code": "A03",
"name": "注入攻击",
"description": "检测SQL注入、命令注入、LDAP注入等注入漏洞",
"category": "security",
"severity": "critical",
"custom_prompt": "检查是否存在注入漏洞SQL注入、命令注入、LDAP注入、XPath注入、NoSQL注入、表达式语言注入",
"fix_suggestion": "使用参数化查询输入验证和转义使用ORM框架最小权限原则",
"reference_url": "https://owasp.org/Top10/A03_2021-Injection/",
},
{
"rule_code": "A04",
"name": "不安全设计",
"description": "检测业务逻辑漏洞、缺少安全控制等设计问题",
"category": "security",
"severity": "high",
"custom_prompt": "检查是否存在不安全的设计:缺少速率限制、业务逻辑漏洞、缺少输入验证、信任边界不清",
"fix_suggestion": "采用安全设计原则,威胁建模,实施深度防御",
"reference_url": "https://owasp.org/Top10/A04_2021-Insecure_Design/",
},
{
"rule_code": "A05",
"name": "安全配置错误",
"description": "检测默认配置、不必要的功能、错误的权限设置",
"category": "security",
"severity": "high",
"custom_prompt": "检查是否存在安全配置错误:默认凭证、不必要的功能启用、详细错误信息泄露、缺少安全头",
"fix_suggestion": "最小化安装,禁用不必要功能,定期审查配置,自动化配置检查",
"reference_url": "https://owasp.org/Top10/A05_2021-Security_Misconfiguration/",
},
{
"rule_code": "A06",
"name": "易受攻击和过时的组件",
"description": "检测使用已知漏洞的依赖库",
"category": "security",
"severity": "high",
"custom_prompt": "检查是否使用了已知漏洞的组件:过时的依赖库、未修补的漏洞、不安全的第三方组件",
"fix_suggestion": "定期更新依赖,使用依赖扫描工具,订阅安全公告",
"reference_url": "https://owasp.org/Top10/A06_2021-Vulnerable_and_Outdated_Components/",
},
{
"rule_code": "A07",
"name": "身份认证失效",
"description": "检测弱密码、会话管理问题、凭证泄露",
"category": "security",
"severity": "critical",
"custom_prompt": "检查是否存在身份认证问题:弱密码策略、会话固定、凭证明文存储、缺少多因素认证",
"fix_suggestion": "实施强密码策略使用MFA安全的会话管理防止暴力破解",
"reference_url": "https://owasp.org/Top10/A07_2021-Identification_and_Authentication_Failures/",
},
{
"rule_code": "A08",
"name": "软件和数据完整性失效",
"description": "检测不安全的反序列化、CI/CD安全问题",
"category": "security",
"severity": "critical",
"custom_prompt": "检查是否存在完整性问题不安全的反序列化、未验证的更新、CI/CD管道安全",
"fix_suggestion": "验证数据完整性,使用数字签名,安全的反序列化",
"reference_url": "https://owasp.org/Top10/A08_2021-Software_and_Data_Integrity_Failures/",
},
{
"rule_code": "A09",
"name": "安全日志和监控失效",
"description": "检测日志记录不足、监控缺失",
"category": "security",
"severity": "medium",
"custom_prompt": "检查是否存在日志监控问题:缺少安全日志、敏感信息记录到日志、缺少告警机制",
"fix_suggestion": "记录安全相关事件,实施监控和告警,定期审查日志",
"reference_url": "https://owasp.org/Top10/A09_2021-Security_Logging_and_Monitoring_Failures/",
},
{
"rule_code": "A10",
"name": "服务端请求伪造(SSRF)",
"description": "检测SSRF漏洞",
"category": "security",
"severity": "high",
"custom_prompt": "检查是否存在SSRF漏洞未验证的URL输入、内网资源访问、云元数据访问",
"fix_suggestion": "验证和过滤URL使用白名单禁用不必要的协议",
"reference_url": "https://owasp.org/Top10/A10_2021-Server-Side_Request_Forgery_%28SSRF%29/",
},
]
},
{
"name": "代码质量规则",
"description": "通用代码质量检查规则集",
"language": "all",
"rule_type": "quality",
"is_default": False,
"sort_order": 1,
"severity_weights": {"critical": 10, "high": 5, "medium": 2, "low": 1},
"rules": [
{
"rule_code": "CQ001",
"name": "函数过长",
"description": "函数超过50行建议拆分",
"category": "maintainability",
"severity": "medium",
"custom_prompt": "检查函数是否过长超过50行是否应该拆分为更小的函数",
"fix_suggestion": "将大函数拆分为多个小函数,每个函数只做一件事",
},
{
"rule_code": "CQ002",
"name": "重复代码",
"description": "检测重复的代码块",
"category": "maintainability",
"severity": "medium",
"custom_prompt": "检查是否存在重复的代码块,可以提取为公共函数或类",
"fix_suggestion": "提取重复代码为公共函数、类或模块",
},
{
"rule_code": "CQ003",
"name": "嵌套过深",
"description": "代码嵌套层级超过4层",
"category": "maintainability",
"severity": "low",
"custom_prompt": "检查代码嵌套是否过深超过4层影响可读性",
"fix_suggestion": "使用早返回、提取函数等方式减少嵌套",
},
{
"rule_code": "CQ004",
"name": "魔法数字",
"description": "代码中使用未命名的常量",
"category": "style",
"severity": "low",
"custom_prompt": "检查是否存在魔法数字或魔法字符串,应该定义为常量",
"fix_suggestion": "将魔法数字定义为有意义的常量",
},
{
"rule_code": "CQ005",
"name": "缺少错误处理",
"description": "缺少异常捕获或错误处理",
"category": "bug",
"severity": "high",
"custom_prompt": "检查是否缺少必要的错误处理,可能导致程序崩溃",
"fix_suggestion": "添加适当的try-catch或错误检查",
},
{
"rule_code": "CQ006",
"name": "未使用的变量",
"description": "声明但未使用的变量",
"category": "style",
"severity": "low",
"custom_prompt": "检查是否存在声明但未使用的变量",
"fix_suggestion": "删除未使用的变量或使用它们",
},
{
"rule_code": "CQ007",
"name": "命名不规范",
"description": "变量、函数、类命名不符合规范",
"category": "style",
"severity": "low",
"custom_prompt": "检查命名是否符合语言规范和最佳实践",
"fix_suggestion": "使用有意义的、符合规范的命名",
},
{
"rule_code": "CQ008",
"name": "注释缺失",
"description": "复杂逻辑缺少必要注释",
"category": "maintainability",
"severity": "low",
"custom_prompt": "检查复杂逻辑是否缺少必要的注释说明",
"fix_suggestion": "为复杂逻辑添加清晰的注释",
},
]
},
{
"name": "性能优化规则",
"description": "性能问题检测规则集",
"language": "all",
"rule_type": "performance",
"is_default": False,
"sort_order": 2,
"severity_weights": {"critical": 10, "high": 5, "medium": 2, "low": 1},
"rules": [
{
"rule_code": "PERF001",
"name": "N+1查询",
"description": "检测数据库N+1查询问题",
"category": "performance",
"severity": "high",
"custom_prompt": "检查是否存在N+1查询问题在循环中执行数据库查询",
"fix_suggestion": "使用JOIN查询或批量查询替代循环查询",
},
{
"rule_code": "PERF002",
"name": "内存泄漏",
"description": "检测潜在的内存泄漏",
"category": "performance",
"severity": "critical",
"custom_prompt": "检查是否存在内存泄漏:未关闭的资源、循环引用、大对象未释放",
"fix_suggestion": "使用try-finally或with语句确保资源释放",
},
{
"rule_code": "PERF003",
"name": "低效算法",
"description": "检测时间复杂度过高的算法",
"category": "performance",
"severity": "medium",
"custom_prompt": "检查是否存在低效算法如O(n²)可优化为O(n)或O(nlogn)",
"fix_suggestion": "使用更高效的算法或数据结构",
},
{
"rule_code": "PERF004",
"name": "不必要的对象创建",
"description": "在循环中创建不必要的对象",
"category": "performance",
"severity": "medium",
"custom_prompt": "检查是否在循环中创建不必要的对象,应该移到循环外",
"fix_suggestion": "将对象创建移到循环外部,或使用对象池",
},
{
"rule_code": "PERF005",
"name": "同步阻塞",
"description": "检测同步阻塞操作",
"category": "performance",
"severity": "medium",
"custom_prompt": "检查是否存在同步阻塞操作,应该使用异步方式",
"fix_suggestion": "使用异步I/O或多线程处理",
},
]
},
]
async def init_system_templates(db: AsyncSession) -> None:
"""初始化系统提示词模板"""
for template_data in SYSTEM_PROMPT_TEMPLATES:
# 检查是否已存在
result = await db.execute(
select(PromptTemplate).where(
PromptTemplate.name == template_data["name"],
PromptTemplate.is_system == True
)
)
existing = result.scalar_one_or_none()
if not existing:
template = PromptTemplate(
name=template_data["name"],
description=template_data["description"],
template_type=template_data["template_type"],
content_zh=template_data["content_zh"],
content_en=template_data["content_en"],
variables=json.dumps(template_data.get("variables", {})),
is_default=template_data.get("is_default", False),
is_system=True,
is_active=True,
sort_order=template_data.get("sort_order", 0),
)
db.add(template)
logger.info(f"✓ 创建系统提示词模板: {template_data['name']}")
await db.flush()
async def init_system_rule_sets(db: AsyncSession) -> None:
"""初始化系统审计规则集"""
for rule_set_data in SYSTEM_RULE_SETS:
# 检查是否已存在
result = await db.execute(
select(AuditRuleSet).where(
AuditRuleSet.name == rule_set_data["name"],
AuditRuleSet.is_system == True
)
)
existing = result.scalar_one_or_none()
if not existing:
rule_set = AuditRuleSet(
name=rule_set_data["name"],
description=rule_set_data["description"],
language=rule_set_data["language"],
rule_type=rule_set_data["rule_type"],
severity_weights=json.dumps(rule_set_data.get("severity_weights", {})),
is_default=rule_set_data.get("is_default", False),
is_system=True,
is_active=True,
sort_order=rule_set_data.get("sort_order", 0),
)
db.add(rule_set)
await db.flush()
# 创建规则
for rule_data in rule_set_data.get("rules", []):
rule = AuditRule(
rule_set_id=rule_set.id,
rule_code=rule_data["rule_code"],
name=rule_data["name"],
description=rule_data.get("description"),
category=rule_data["category"],
severity=rule_data.get("severity", "medium"),
custom_prompt=rule_data.get("custom_prompt"),
fix_suggestion=rule_data.get("fix_suggestion"),
reference_url=rule_data.get("reference_url"),
enabled=True,
sort_order=rule_data.get("sort_order", 0),
)
db.add(rule)
logger.info(f"✓ 创建系统规则集: {rule_set_data['name']} ({len(rule_set_data.get('rules', []))} 条规则)")
await db.flush()
async def init_templates_and_rules(db: AsyncSession) -> None:
"""初始化所有系统模板和规则"""
logger.info("开始初始化系统模板和规则...")
try:
await init_system_templates(db)
await init_system_rule_sets(db)
await db.commit()
logger.info("✓ 系统模板和规则初始化完成")
except Exception as e:
logger.warning(f"初始化模板和规则时出错(可能表不存在): {e}")
await db.rollback()

View File

@ -641,6 +641,186 @@ Please analyze the following code:
}
}
async def analyze_code_with_custom_prompt(
self,
code: str,
language: str,
custom_prompt: str,
rules: Optional[list] = None
) -> Dict[str, Any]:
"""
使用自定义提示词分析代码
Args:
code: 要分析的代码
language: 编程语言
custom_prompt: 自定义系统提示词
rules: 可选的审计规则列表
"""
output_language = self._get_output_language()
is_chinese = output_language == 'zh-CN'
# 添加行号
code_with_lines = '\n'.join(
f"{i+1}| {line}" for i, line in enumerate(code.split('\n'))
)
# 构建规则提示词
rules_prompt = ""
if rules:
rules_prompt = "\n\n【审计规则】请特别关注以下规则:\n"
for rule in rules:
if rule.get('enabled', True):
rules_prompt += f"- [{rule.get('rule_code', '')}] {rule.get('name', '')}: {rule.get('description', '')}\n"
if rule.get('custom_prompt'):
rules_prompt += f" 检测要点: {rule.get('custom_prompt')}\n"
# JSON Schema
schema = """{
"issues": [
{
"type": "security|bug|performance|style|maintainability",
"severity": "critical|high|medium|low",
"title": "string",
"description": "string",
"suggestion": "string",
"line": 1,
"column": 1,
"code_snippet": "string",
"rule_code": "string (optional, if matched a specific rule)"
}
],
"quality_score": 0-100,
"summary": {
"total_issues": number,
"critical_issues": number,
"high_issues": number,
"medium_issues": number,
"low_issues": number
}
}"""
# 构建完整的系统提示词
format_instruction = f"""
输出格式要求
1. 必须只输出纯JSON对象
2. 禁止在JSON前后添加任何文字说明markdown标记
3. 输出格式必须符合以下 JSON Schema
{schema}
{rules_prompt}"""
full_system_prompt = custom_prompt + format_instruction
# 构建用户提示词
if is_chinese:
user_prompt = f"""编程语言: {language}
代码已标注行号格式行号| 代码内容请根据行号准确填写 line 字段
请分析以下代码:
{code_with_lines}"""
else:
user_prompt = f"""Programming Language: {language}
Code is annotated with line numbers (format: lineNumber| code), please fill the 'line' field accurately.
Please analyze the following code:
{code_with_lines}"""
try:
adapter = LLMFactory.create_adapter(self.config)
request = LLMRequest(
messages=[
LLMMessage(role="system", content=full_system_prompt),
LLMMessage(role="user", content=user_prompt)
],
temperature=0.1,
)
response = await adapter.complete(request)
content = response.content
if not content or not content.strip():
raise Exception("LLM返回空响应")
result = self._parse_json(content)
return result
except Exception as e:
logger.error(f"Custom prompt analysis failed: {e}", exc_info=True)
raise
async def analyze_code_with_rules(
self,
code: str,
language: str,
rule_set_id: Optional[str] = None,
prompt_template_id: Optional[str] = None,
db_session = None
) -> Dict[str, Any]:
"""
使用指定的规则集和提示词模板分析代码
Args:
code: 要分析的代码
language: 编程语言
rule_set_id: 规则集ID可选
prompt_template_id: 提示词模板ID可选
db_session: 数据库会话
"""
custom_prompt = None
rules = None
if db_session:
from sqlalchemy.future import select
from sqlalchemy.orm import selectinload
# 获取提示词模板
if prompt_template_id:
from app.models.prompt_template import PromptTemplate
result = await db_session.execute(
select(PromptTemplate).where(PromptTemplate.id == prompt_template_id)
)
template = result.scalar_one_or_none()
if template:
output_language = self._get_output_language()
custom_prompt = template.content_zh if output_language == 'zh-CN' else template.content_en
# 获取规则集
if rule_set_id:
from app.models.audit_rule import AuditRuleSet
result = await db_session.execute(
select(AuditRuleSet)
.options(selectinload(AuditRuleSet.rules))
.where(AuditRuleSet.id == rule_set_id)
)
rule_set = result.scalar_one_or_none()
if rule_set and rule_set.rules:
rules = [
{
"rule_code": r.rule_code,
"name": r.name,
"description": r.description,
"category": r.category,
"severity": r.severity,
"custom_prompt": r.custom_prompt,
"enabled": r.enabled,
}
for r in rule_set.rules if r.enabled
]
# 如果有自定义提示词,使用自定义分析
if custom_prompt:
return await self.analyze_code_with_custom_prompt(code, language, custom_prompt, rules)
# 否则使用默认分析
return await self.analyze_code(code, language)
# 全局服务实例
llm_service = LLMService()

View File

@ -8,6 +8,8 @@ import TaskDetail from "@/pages/TaskDetail";
import AdminDashboard from "@/pages/AdminDashboard";
import LogsPage from "@/pages/LogsPage";
import Account from "@/pages/Account";
import AuditRules from "@/pages/AuditRules";
import PromptManager from "@/pages/PromptManager";
import type { ReactNode } from 'react';
export interface RouteConfig {
@ -54,6 +56,18 @@ const routes: RouteConfig[] = [
element: <TaskDetail />,
visible: false,
},
{
name: "审计规则",
path: "/audit-rules",
element: <AuditRules />,
visible: true,
},
{
name: "提示词管理",
path: "/prompts",
element: <PromptManager />,
visible: true,
},
{
name: "系统管理",
path: "/admin",

View File

@ -14,7 +14,9 @@ import {
ChevronLeft,
ChevronRight,
Github,
UserCircle
UserCircle,
Shield,
MessageSquare
} from "lucide-react";
import routes from "@/app/routes";
import { version } from "../../../package.json";
@ -25,6 +27,8 @@ const routeIcons: Record<string, React.ReactNode> = {
"/projects": <FolderGit2 className="w-5 h-5" />,
"/instant-analysis": <Zap className="w-5 h-5" />,
"/audit-tasks": <ListTodo className="w-5 h-5" />,
"/audit-rules": <Shield className="w-5 h-5" />,
"/prompts": <MessageSquare className="w-5 h-5" />,
"/admin": <Settings className="w-5 h-5" />,
"/recycle-bin": <Trash2 className="w-5 h-5" />,
"/logs": <FileText className="w-5 h-5" />,

View File

@ -0,0 +1,678 @@
/**
*
*/
import { useState, useEffect } from 'react';
import { Button } from '@/components/ui/button';
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/card';
import { Badge } from '@/components/ui/badge';
import { Switch } from '@/components/ui/switch';
import { Input } from '@/components/ui/input';
import { Textarea } from '@/components/ui/textarea';
import { Label } from '@/components/ui/label';
import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from '@/components/ui/select';
import { Dialog, DialogContent, DialogDescription, DialogFooter, DialogHeader, DialogTitle } from '@/components/ui/dialog';
import { ScrollArea } from '@/components/ui/scroll-area';
import { useToast } from '@/shared/hooks/use-toast';
import {
Plus,
Trash2,
Edit,
Download,
Upload,
Shield,
Bug,
Zap,
Code,
Settings,
ChevronDown,
ChevronRight,
ExternalLink,
} from 'lucide-react';
import {
getRuleSets,
createRuleSet,
updateRuleSet,
deleteRuleSet,
exportRuleSet,
importRuleSet,
addRuleToSet,
updateRule,
deleteRule,
toggleRule,
type AuditRuleSet,
type AuditRule,
type AuditRuleSetCreate,
type AuditRuleCreate,
} from '@/shared/api/rules';
const CATEGORIES = [
{ value: 'security', label: '安全', icon: Shield, color: 'text-red-500' },
{ value: 'bug', label: 'Bug', icon: Bug, color: 'text-orange-500' },
{ value: 'performance', label: '性能', icon: Zap, color: 'text-yellow-500' },
{ value: 'style', label: '代码风格', icon: Code, color: 'text-blue-500' },
{ value: 'maintainability', label: '可维护性', icon: Settings, color: 'text-purple-500' },
];
const SEVERITIES = [
{ value: 'critical', label: '严重', color: 'bg-red-500' },
{ value: 'high', label: '高', color: 'bg-orange-500' },
{ value: 'medium', label: '中', color: 'bg-yellow-500' },
{ value: 'low', label: '低', color: 'bg-blue-500' },
];
const LANGUAGES = [
{ value: 'all', label: '所有语言' },
{ value: 'python', label: 'Python' },
{ value: 'javascript', label: 'JavaScript' },
{ value: 'typescript', label: 'TypeScript' },
{ value: 'java', label: 'Java' },
{ value: 'go', label: 'Go' },
{ value: 'rust', label: 'Rust' },
{ value: 'cpp', label: 'C/C++' },
];
const RULE_TYPES = [
{ value: 'security', label: '安全规则' },
{ value: 'quality', label: '质量规则' },
{ value: 'performance', label: '性能规则' },
{ value: 'custom', label: '自定义规则' },
];
export default function AuditRules() {
const { toast } = useToast();
const [ruleSets, setRuleSets] = useState<AuditRuleSet[]>([]);
const [loading, setLoading] = useState(true);
const [expandedSets, setExpandedSets] = useState<Set<string>>(new Set());
// 对话框状态
const [showCreateDialog, setShowCreateDialog] = useState(false);
const [showEditDialog, setShowEditDialog] = useState(false);
const [showRuleDialog, setShowRuleDialog] = useState(false);
const [showImportDialog, setShowImportDialog] = useState(false);
const [selectedRuleSet, setSelectedRuleSet] = useState<AuditRuleSet | null>(null);
const [selectedRule, setSelectedRule] = useState<AuditRule | null>(null);
// 表单状态
const [ruleSetForm, setRuleSetForm] = useState<AuditRuleSetCreate>({
name: '',
description: '',
language: 'all',
rule_type: 'custom',
});
const [ruleForm, setRuleForm] = useState<AuditRuleCreate>({
rule_code: '',
name: '',
description: '',
category: 'security',
severity: 'medium',
custom_prompt: '',
fix_suggestion: '',
reference_url: '',
enabled: true,
});
const [importJson, setImportJson] = useState('');
useEffect(() => {
loadRuleSets();
}, []);
const loadRuleSets = async () => {
try {
setLoading(true);
const response = await getRuleSets();
setRuleSets(response.items);
} catch (error) {
toast({ title: '加载失败', description: '无法加载规则集', variant: 'destructive' });
} finally {
setLoading(false);
}
};
const toggleExpand = (id: string) => {
const newExpanded = new Set(expandedSets);
if (newExpanded.has(id)) {
newExpanded.delete(id);
} else {
newExpanded.add(id);
}
setExpandedSets(newExpanded);
};
const handleCreateRuleSet = async () => {
try {
await createRuleSet(ruleSetForm);
toast({ title: '创建成功', description: '规则集已创建' });
setShowCreateDialog(false);
setRuleSetForm({ name: '', description: '', language: 'all', rule_type: 'custom' });
loadRuleSets();
} catch (error) {
toast({ title: '创建失败', variant: 'destructive' });
}
};
const handleUpdateRuleSet = async () => {
if (!selectedRuleSet) return;
try {
await updateRuleSet(selectedRuleSet.id, ruleSetForm);
toast({ title: '更新成功' });
setShowEditDialog(false);
loadRuleSets();
} catch (error) {
toast({ title: '更新失败', variant: 'destructive' });
}
};
const handleDeleteRuleSet = async (id: string) => {
if (!confirm('确定要删除此规则集吗?')) return;
try {
await deleteRuleSet(id);
toast({ title: '删除成功' });
loadRuleSets();
} catch (error: any) {
toast({ title: '删除失败', description: error.message, variant: 'destructive' });
}
};
const handleExport = async (ruleSet: AuditRuleSet) => {
try {
const blob = await exportRuleSet(ruleSet.id);
const url = URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = `${ruleSet.name}.json`;
a.click();
URL.revokeObjectURL(url);
toast({ title: '导出成功' });
} catch (error) {
toast({ title: '导出失败', variant: 'destructive' });
}
};
const handleImport = async () => {
try {
const data = JSON.parse(importJson);
await importRuleSet(data);
toast({ title: '导入成功' });
setShowImportDialog(false);
setImportJson('');
loadRuleSets();
} catch (error: any) {
toast({ title: '导入失败', description: error.message, variant: 'destructive' });
}
};
const handleAddRule = async () => {
if (!selectedRuleSet) return;
try {
await addRuleToSet(selectedRuleSet.id, ruleForm);
toast({ title: '添加成功' });
setShowRuleDialog(false);
setRuleForm({
rule_code: '',
name: '',
description: '',
category: 'security',
severity: 'medium',
custom_prompt: '',
fix_suggestion: '',
reference_url: '',
enabled: true,
});
loadRuleSets();
} catch (error) {
toast({ title: '添加失败', variant: 'destructive' });
}
};
const handleUpdateRule = async () => {
if (!selectedRuleSet || !selectedRule) return;
try {
await updateRule(selectedRuleSet.id, selectedRule.id, ruleForm);
toast({ title: '更新成功' });
setShowRuleDialog(false);
loadRuleSets();
} catch (error) {
toast({ title: '更新失败', variant: 'destructive' });
}
};
const handleDeleteRule = async (ruleSetId: string, ruleId: string) => {
if (!confirm('确定要删除此规则吗?')) return;
try {
await deleteRule(ruleSetId, ruleId);
toast({ title: '删除成功' });
loadRuleSets();
} catch (error) {
toast({ title: '删除失败', variant: 'destructive' });
}
};
const handleToggleRule = async (ruleSetId: string, ruleId: string) => {
try {
const result = await toggleRule(ruleSetId, ruleId);
toast({ title: result.message });
loadRuleSets();
} catch (error) {
toast({ title: '操作失败', variant: 'destructive' });
}
};
const openEditRuleSetDialog = (ruleSet: AuditRuleSet) => {
setSelectedRuleSet(ruleSet);
setRuleSetForm({
name: ruleSet.name,
description: ruleSet.description || '',
language: ruleSet.language,
rule_type: ruleSet.rule_type,
});
setShowEditDialog(true);
};
const openAddRuleDialog = (ruleSet: AuditRuleSet) => {
setSelectedRuleSet(ruleSet);
setSelectedRule(null);
setRuleForm({
rule_code: '',
name: '',
description: '',
category: 'security',
severity: 'medium',
custom_prompt: '',
fix_suggestion: '',
reference_url: '',
enabled: true,
});
setShowRuleDialog(true);
};
const openEditRuleDialog = (ruleSet: AuditRuleSet, rule: AuditRule) => {
setSelectedRuleSet(ruleSet);
setSelectedRule(rule);
setRuleForm({
rule_code: rule.rule_code,
name: rule.name,
description: rule.description || '',
category: rule.category,
severity: rule.severity,
custom_prompt: rule.custom_prompt || '',
fix_suggestion: rule.fix_suggestion || '',
reference_url: rule.reference_url || '',
enabled: rule.enabled,
});
setShowRuleDialog(true);
};
const getCategoryInfo = (category: string) => {
return CATEGORIES.find(c => c.value === category) || CATEGORIES[0];
};
const getSeverityInfo = (severity: string) => {
return SEVERITIES.find(s => s.value === severity) || SEVERITIES[2];
};
return (
<div className="container mx-auto py-6 space-y-6">
{/* 页面标题 */}
<div className="flex items-center justify-between">
<div>
<h1 className="text-2xl font-bold"></h1>
<p className="text-muted-foreground"></p>
</div>
<div className="flex gap-2">
<Button variant="outline" onClick={() => setShowImportDialog(true)}>
<Upload className="w-4 h-4 mr-2" />
</Button>
<Button onClick={() => setShowCreateDialog(true)}>
<Plus className="w-4 h-4 mr-2" />
</Button>
</div>
</div>
{/* 规则集列表 */}
<div className="space-y-4">
{loading ? (
<div className="text-center py-8 text-muted-foreground">...</div>
) : ruleSets.length === 0 ? (
<Card>
<CardContent className="py-8 text-center text-muted-foreground">
"新建规则集"
</CardContent>
</Card>
) : (
ruleSets.map(ruleSet => (
<Card key={ruleSet.id} className={!ruleSet.is_active ? 'opacity-60' : ''}>
<CardHeader className="pb-2">
<div className="flex items-center justify-between">
<div className="flex items-center gap-3 cursor-pointer" onClick={() => toggleExpand(ruleSet.id)}>
{expandedSets.has(ruleSet.id) ? (
<ChevronDown className="w-5 h-5" />
) : (
<ChevronRight className="w-5 h-5" />
)}
<div>
<CardTitle className="flex items-center gap-2">
{ruleSet.name}
{ruleSet.is_system && <Badge variant="secondary"></Badge>}
{ruleSet.is_default && <Badge></Badge>}
</CardTitle>
<CardDescription>{ruleSet.description}</CardDescription>
</div>
</div>
<div className="flex items-center gap-2">
<Badge variant="outline">{LANGUAGES.find(l => l.value === ruleSet.language)?.label}</Badge>
<Badge variant="outline">{RULE_TYPES.find(t => t.value === ruleSet.rule_type)?.label}</Badge>
<span className="text-sm text-muted-foreground">
{ruleSet.enabled_rules_count}/{ruleSet.rules_count}
</span>
<Button variant="ghost" size="icon" onClick={() => handleExport(ruleSet)}>
<Download className="w-4 h-4" />
</Button>
{!ruleSet.is_system && (
<>
<Button variant="ghost" size="icon" onClick={() => openEditRuleSetDialog(ruleSet)}>
<Edit className="w-4 h-4" />
</Button>
<Button variant="ghost" size="icon" onClick={() => handleDeleteRuleSet(ruleSet.id)}>
<Trash2 className="w-4 h-4" />
</Button>
</>
)}
</div>
</div>
</CardHeader>
{expandedSets.has(ruleSet.id) && (
<CardContent>
<div className="space-y-2">
{!ruleSet.is_system && (
<Button variant="outline" size="sm" onClick={() => openAddRuleDialog(ruleSet)}>
<Plus className="w-4 h-4 mr-2" />
</Button>
)}
<ScrollArea className="h-[400px]">
<div className="space-y-2">
{ruleSet.rules.map(rule => {
const categoryInfo = getCategoryInfo(rule.category);
const severityInfo = getSeverityInfo(rule.severity);
const CategoryIcon = categoryInfo.icon;
return (
<div
key={rule.id}
className={`p-3 border rounded-lg ${!rule.enabled ? 'opacity-50' : ''}`}
>
<div className="flex items-start justify-between">
<div className="flex items-start gap-3">
<CategoryIcon className={`w-5 h-5 mt-0.5 ${categoryInfo.color}`} />
<div>
<div className="flex items-center gap-2">
<span className="font-mono text-sm text-muted-foreground">{rule.rule_code}</span>
<span className="font-medium">{rule.name}</span>
<Badge className={severityInfo.color}>{severityInfo.label}</Badge>
</div>
{rule.description && (
<p className="text-sm text-muted-foreground mt-1">{rule.description}</p>
)}
{rule.reference_url && (
<a
href={rule.reference_url}
target="_blank"
rel="noopener noreferrer"
className="text-sm text-blue-500 hover:underline flex items-center gap-1 mt-1"
>
<ExternalLink className="w-3 h-3" />
</a>
)}
</div>
</div>
<div className="flex items-center gap-2">
<Switch
checked={rule.enabled}
onCheckedChange={() => handleToggleRule(ruleSet.id, rule.id)}
/>
{!ruleSet.is_system && (
<>
<Button variant="ghost" size="icon" onClick={() => openEditRuleDialog(ruleSet, rule)}>
<Edit className="w-4 h-4" />
</Button>
<Button variant="ghost" size="icon" onClick={() => handleDeleteRule(ruleSet.id, rule.id)}>
<Trash2 className="w-4 h-4" />
</Button>
</>
)}
</div>
</div>
</div>
);
})}
</div>
</ScrollArea>
</div>
</CardContent>
)}
</Card>
))
)}
</div>
{/* 创建规则集对话框 */}
<Dialog open={showCreateDialog} onOpenChange={setShowCreateDialog}>
<DialogContent>
<DialogHeader>
<DialogTitle></DialogTitle>
<DialogDescription></DialogDescription>
</DialogHeader>
<div className="space-y-4">
<div>
<Label></Label>
<Input
value={ruleSetForm.name}
onChange={e => setRuleSetForm({ ...ruleSetForm, name: e.target.value })}
placeholder="规则集名称"
/>
</div>
<div>
<Label></Label>
<Textarea
value={ruleSetForm.description}
onChange={e => setRuleSetForm({ ...ruleSetForm, description: e.target.value })}
placeholder="规则集描述"
/>
</div>
<div className="grid grid-cols-2 gap-4">
<div>
<Label></Label>
<Select value={ruleSetForm.language} onValueChange={v => setRuleSetForm({ ...ruleSetForm, language: v })}>
<SelectTrigger><SelectValue /></SelectTrigger>
<SelectContent>
{LANGUAGES.map(l => <SelectItem key={l.value} value={l.value}>{l.label}</SelectItem>)}
</SelectContent>
</Select>
</div>
<div>
<Label></Label>
<Select value={ruleSetForm.rule_type} onValueChange={v => setRuleSetForm({ ...ruleSetForm, rule_type: v })}>
<SelectTrigger><SelectValue /></SelectTrigger>
<SelectContent>
{RULE_TYPES.map(t => <SelectItem key={t.value} value={t.value}>{t.label}</SelectItem>)}
</SelectContent>
</Select>
</div>
</div>
</div>
<DialogFooter>
<Button variant="outline" onClick={() => setShowCreateDialog(false)}></Button>
<Button onClick={handleCreateRuleSet}></Button>
</DialogFooter>
</DialogContent>
</Dialog>
{/* 编辑规则集对话框 */}
<Dialog open={showEditDialog} onOpenChange={setShowEditDialog}>
<DialogContent>
<DialogHeader>
<DialogTitle></DialogTitle>
</DialogHeader>
<div className="space-y-4">
<div>
<Label></Label>
<Input
value={ruleSetForm.name}
onChange={e => setRuleSetForm({ ...ruleSetForm, name: e.target.value })}
/>
</div>
<div>
<Label></Label>
<Textarea
value={ruleSetForm.description}
onChange={e => setRuleSetForm({ ...ruleSetForm, description: e.target.value })}
/>
</div>
<div className="grid grid-cols-2 gap-4">
<div>
<Label></Label>
<Select value={ruleSetForm.language} onValueChange={v => setRuleSetForm({ ...ruleSetForm, language: v })}>
<SelectTrigger><SelectValue /></SelectTrigger>
<SelectContent>
{LANGUAGES.map(l => <SelectItem key={l.value} value={l.value}>{l.label}</SelectItem>)}
</SelectContent>
</Select>
</div>
<div>
<Label></Label>
<Select value={ruleSetForm.rule_type} onValueChange={v => setRuleSetForm({ ...ruleSetForm, rule_type: v })}>
<SelectTrigger><SelectValue /></SelectTrigger>
<SelectContent>
{RULE_TYPES.map(t => <SelectItem key={t.value} value={t.value}>{t.label}</SelectItem>)}
</SelectContent>
</Select>
</div>
</div>
</div>
<DialogFooter>
<Button variant="outline" onClick={() => setShowEditDialog(false)}></Button>
<Button onClick={handleUpdateRuleSet}></Button>
</DialogFooter>
</DialogContent>
</Dialog>
{/* 规则编辑对话框 */}
<Dialog open={showRuleDialog} onOpenChange={setShowRuleDialog}>
<DialogContent className="max-w-2xl">
<DialogHeader>
<DialogTitle>{selectedRule ? '编辑规则' : '添加规则'}</DialogTitle>
</DialogHeader>
<div className="space-y-4">
<div className="grid grid-cols-2 gap-4">
<div>
<Label></Label>
<Input
value={ruleForm.rule_code}
onChange={e => setRuleForm({ ...ruleForm, rule_code: e.target.value })}
placeholder="如 SEC001"
/>
</div>
<div>
<Label></Label>
<Input
value={ruleForm.name}
onChange={e => setRuleForm({ ...ruleForm, name: e.target.value })}
placeholder="规则名称"
/>
</div>
</div>
<div>
<Label></Label>
<Textarea
value={ruleForm.description}
onChange={e => setRuleForm({ ...ruleForm, description: e.target.value })}
placeholder="规则描述"
/>
</div>
<div className="grid grid-cols-2 gap-4">
<div>
<Label></Label>
<Select value={ruleForm.category} onValueChange={v => setRuleForm({ ...ruleForm, category: v })}>
<SelectTrigger><SelectValue /></SelectTrigger>
<SelectContent>
{CATEGORIES.map(c => <SelectItem key={c.value} value={c.value}>{c.label}</SelectItem>)}
</SelectContent>
</Select>
</div>
<div>
<Label></Label>
<Select value={ruleForm.severity} onValueChange={v => setRuleForm({ ...ruleForm, severity: v })}>
<SelectTrigger><SelectValue /></SelectTrigger>
<SelectContent>
{SEVERITIES.map(s => <SelectItem key={s.value} value={s.value}>{s.label}</SelectItem>)}
</SelectContent>
</Select>
</div>
</div>
<div>
<Label></Label>
<Textarea
value={ruleForm.custom_prompt}
onChange={e => setRuleForm({ ...ruleForm, custom_prompt: e.target.value })}
placeholder="用于增强LLM检测的自定义提示词"
rows={3}
/>
</div>
<div>
<Label></Label>
<Textarea
value={ruleForm.fix_suggestion}
onChange={e => setRuleForm({ ...ruleForm, fix_suggestion: e.target.value })}
placeholder="修复建议模板"
rows={2}
/>
</div>
<div>
<Label></Label>
<Input
value={ruleForm.reference_url}
onChange={e => setRuleForm({ ...ruleForm, reference_url: e.target.value })}
placeholder="如 https://owasp.org/..."
/>
</div>
</div>
<DialogFooter>
<Button variant="outline" onClick={() => setShowRuleDialog(false)}></Button>
<Button onClick={selectedRule ? handleUpdateRule : handleAddRule}>
{selectedRule ? '保存' : '添加'}
</Button>
</DialogFooter>
</DialogContent>
</Dialog>
{/* 导入对话框 */}
<Dialog open={showImportDialog} onOpenChange={setShowImportDialog}>
<DialogContent className="max-w-2xl">
<DialogHeader>
<DialogTitle></DialogTitle>
<DialogDescription> JSON </DialogDescription>
</DialogHeader>
<Textarea
value={importJson}
onChange={e => setImportJson(e.target.value)}
placeholder='{"name": "...", "rules": [...]}'
rows={15}
className="font-mono text-sm"
/>
<DialogFooter>
<Button variant="outline" onClick={() => setShowImportDialog(false)}></Button>
<Button onClick={handleImport}></Button>
</DialogFooter>
</DialogContent>
</Dialog>
</div>
);
}

View File

@ -0,0 +1,511 @@
/**
*
*/
import { useState, useEffect } from 'react';
import { Button } from '@/components/ui/button';
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/card';
import { Badge } from '@/components/ui/badge';
import { Switch } from '@/components/ui/switch';
import { Input } from '@/components/ui/input';
import { Textarea } from '@/components/ui/textarea';
import { Label } from '@/components/ui/label';
import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from '@/components/ui/select';
import { Dialog, DialogContent, DialogDescription, DialogFooter, DialogHeader, DialogTitle } from '@/components/ui/dialog';
import { Tabs, TabsContent, TabsList, TabsTrigger } from '@/components/ui/tabs';
import { useToast } from '@/shared/hooks/use-toast';
import {
Plus,
Trash2,
Edit,
Copy,
Play,
FileText,
Sparkles,
Check,
Loader2,
} from 'lucide-react';
import {
getPromptTemplates,
createPromptTemplate,
updatePromptTemplate,
deletePromptTemplate,
testPromptTemplate,
type PromptTemplate,
type PromptTemplateCreate,
} from '@/shared/api/prompts';
const TEMPLATE_TYPES = [
{ value: 'system', label: '系统提示词' },
{ value: 'user', label: '用户提示词' },
{ value: 'analysis', label: '分析提示词' },
];
const TEST_CODE_SAMPLES: Record<string, string> = {
python: `def login(username, password):
query = f"SELECT * FROM users WHERE username='{username}' AND password='{password}'"
cursor.execute(query)
return cursor.fetchone()`,
javascript: `function getUserData(userId) {
const query = "SELECT * FROM users WHERE id = " + userId;
return db.query(query);
}`,
java: `public User findUser(String username) {
String query = "SELECT * FROM users WHERE username = '" + username + "'";
return jdbcTemplate.queryForObject(query, User.class);
}`,
};
export default function PromptManager() {
const { toast } = useToast();
const [templates, setTemplates] = useState<PromptTemplate[]>([]);
const [loading, setLoading] = useState(true);
// 对话框状态
const [showCreateDialog, setShowCreateDialog] = useState(false);
const [showEditDialog, setShowEditDialog] = useState(false);
const [showTestDialog, setShowTestDialog] = useState(false);
const [selectedTemplate, setSelectedTemplate] = useState<PromptTemplate | null>(null);
const [testing, setTesting] = useState(false);
const [testResult, setTestResult] = useState<any>(null);
// 表单状态
const [form, setForm] = useState<PromptTemplateCreate>({
name: '',
description: '',
template_type: 'system',
content_zh: '',
content_en: '',
is_active: true,
});
// 测试表单
const [testForm, setTestForm] = useState({
language: 'python',
code: TEST_CODE_SAMPLES.python,
});
useEffect(() => {
loadTemplates();
}, []);
const loadTemplates = async () => {
try {
setLoading(true);
const response = await getPromptTemplates();
setTemplates(response.items);
} catch (error) {
toast({ title: '加载失败', description: '无法加载提示词模板', variant: 'destructive' });
} finally {
setLoading(false);
}
};
const handleCreate = async () => {
try {
await createPromptTemplate(form);
toast({ title: '创建成功' });
setShowCreateDialog(false);
resetForm();
loadTemplates();
} catch (error) {
toast({ title: '创建失败', variant: 'destructive' });
}
};
const handleUpdate = async () => {
if (!selectedTemplate) return;
try {
await updatePromptTemplate(selectedTemplate.id, form);
toast({ title: '更新成功' });
setShowEditDialog(false);
loadTemplates();
} catch (error) {
toast({ title: '更新失败', variant: 'destructive' });
}
};
const handleDelete = async (id: string) => {
if (!confirm('确定要删除此模板吗?')) return;
try {
await deletePromptTemplate(id);
toast({ title: '删除成功' });
loadTemplates();
} catch (error: any) {
toast({ title: '删除失败', description: error.message, variant: 'destructive' });
}
};
const handleTest = async () => {
if (!selectedTemplate) return;
const content = selectedTemplate.content_zh || selectedTemplate.content_en || '';
if (!content) {
toast({ title: '提示词内容为空', variant: 'destructive' });
return;
}
setTesting(true);
setTestResult(null);
try {
const result = await testPromptTemplate({
content,
language: testForm.language,
code: testForm.code,
});
setTestResult(result);
if (result.success) {
toast({ title: '测试完成', description: `耗时 ${result.execution_time}s` });
} else {
toast({ title: '测试失败', description: result.error, variant: 'destructive' });
}
} catch (error: any) {
toast({ title: '测试失败', description: error.message, variant: 'destructive' });
} finally {
setTesting(false);
}
};
const resetForm = () => {
setForm({
name: '',
description: '',
template_type: 'system',
content_zh: '',
content_en: '',
is_active: true,
});
};
const openEditDialog = (template: PromptTemplate) => {
setSelectedTemplate(template);
setForm({
name: template.name,
description: template.description || '',
template_type: template.template_type,
content_zh: template.content_zh || '',
content_en: template.content_en || '',
is_active: template.is_active,
});
setShowEditDialog(true);
};
const openTestDialog = (template: PromptTemplate) => {
setSelectedTemplate(template);
setTestResult(null);
setShowTestDialog(true);
};
const copyToClipboard = (text: string) => {
navigator.clipboard.writeText(text);
toast({ title: '已复制到剪贴板' });
};
return (
<div className="container mx-auto py-6 space-y-6">
{/* 页面标题 */}
<div className="flex items-center justify-between">
<div>
<h1 className="text-2xl font-bold"></h1>
<p className="text-muted-foreground"></p>
</div>
<Button onClick={() => { resetForm(); setShowCreateDialog(true); }}>
<Plus className="w-4 h-4 mr-2" />
</Button>
</div>
{/* 模板列表 */}
<div className="grid gap-4 md:grid-cols-2 lg:grid-cols-3">
{loading ? (
<div className="col-span-full text-center py-8 text-muted-foreground">...</div>
) : templates.length === 0 ? (
<Card className="col-span-full">
<CardContent className="py-8 text-center text-muted-foreground">
"新建模板"
</CardContent>
</Card>
) : (
templates.map(template => (
<Card key={template.id} className={!template.is_active ? 'opacity-60' : ''}>
<CardHeader className="pb-2">
<div className="flex items-start justify-between">
<div>
<CardTitle className="flex items-center gap-2 text-lg">
<FileText className="w-5 h-5" />
{template.name}
</CardTitle>
<CardDescription className="mt-1">{template.description}</CardDescription>
</div>
<div className="flex flex-col items-end gap-1">
{template.is_system && <Badge variant="secondary"></Badge>}
{template.is_default && <Badge></Badge>}
<Badge variant="outline">
{TEMPLATE_TYPES.find(t => t.value === template.template_type)?.label}
</Badge>
</div>
</div>
</CardHeader>
<CardContent>
<div className="space-y-3">
{/* 预览 */}
<div className="text-sm text-muted-foreground line-clamp-3 bg-muted p-2 rounded">
{template.content_zh || template.content_en || '(无内容)'}
</div>
{/* 操作按钮 */}
<div className="flex items-center justify-between">
<div className="flex gap-1">
<Button variant="ghost" size="sm" onClick={() => openTestDialog(template)}>
<Play className="w-4 h-4 mr-1" />
</Button>
<Button
variant="ghost"
size="sm"
onClick={() => copyToClipboard(template.content_zh || template.content_en || '')}
>
<Copy className="w-4 h-4 mr-1" />
</Button>
</div>
<div className="flex gap-1">
{!template.is_system && (
<>
<Button variant="ghost" size="icon" onClick={() => openEditDialog(template)}>
<Edit className="w-4 h-4" />
</Button>
<Button variant="ghost" size="icon" onClick={() => handleDelete(template.id)}>
<Trash2 className="w-4 h-4" />
</Button>
</>
)}
</div>
</div>
</div>
</CardContent>
</Card>
))
)}
</div>
{/* 创建/编辑对话框 */}
<Dialog open={showCreateDialog || showEditDialog} onOpenChange={(open) => {
if (!open) {
setShowCreateDialog(false);
setShowEditDialog(false);
}
}}>
<DialogContent className="max-w-4xl max-h-[90vh] overflow-y-auto">
<DialogHeader>
<DialogTitle>{showEditDialog ? '编辑模板' : '新建模板'}</DialogTitle>
<DialogDescription>
{showEditDialog ? '修改提示词模板内容' : '创建自定义提示词模板'}
</DialogDescription>
</DialogHeader>
<div className="space-y-4">
<div className="grid grid-cols-2 gap-4">
<div>
<Label></Label>
<Input
value={form.name}
onChange={e => setForm({ ...form, name: e.target.value })}
placeholder="如:安全专项审计"
/>
</div>
<div>
<Label></Label>
<Select value={form.template_type} onValueChange={v => setForm({ ...form, template_type: v })}>
<SelectTrigger><SelectValue /></SelectTrigger>
<SelectContent>
{TEMPLATE_TYPES.map(t => <SelectItem key={t.value} value={t.value}>{t.label}</SelectItem>)}
</SelectContent>
</Select>
</div>
</div>
<div>
<Label></Label>
<Input
value={form.description}
onChange={e => setForm({ ...form, description: e.target.value })}
placeholder="模板用途描述"
/>
</div>
<Tabs defaultValue="zh" className="w-full">
<TabsList>
<TabsTrigger value="zh"></TabsTrigger>
<TabsTrigger value="en"></TabsTrigger>
</TabsList>
<TabsContent value="zh">
<Textarea
value={form.content_zh}
onChange={e => setForm({ ...form, content_zh: e.target.value })}
placeholder="输入中文提示词内容..."
rows={15}
className="font-mono text-sm"
/>
</TabsContent>
<TabsContent value="en">
<Textarea
value={form.content_en}
onChange={e => setForm({ ...form, content_en: e.target.value })}
placeholder="Enter English prompt content..."
rows={15}
className="font-mono text-sm"
/>
</TabsContent>
</Tabs>
<div className="flex items-center gap-2">
<Switch
checked={form.is_active}
onCheckedChange={v => setForm({ ...form, is_active: v })}
/>
<Label></Label>
</div>
</div>
<DialogFooter>
<Button variant="outline" onClick={() => { setShowCreateDialog(false); setShowEditDialog(false); }}>
</Button>
<Button onClick={showEditDialog ? handleUpdate : handleCreate}>
{showEditDialog ? '保存' : '创建'}
</Button>
</DialogFooter>
</DialogContent>
</Dialog>
{/* 测试对话框 */}
<Dialog open={showTestDialog} onOpenChange={setShowTestDialog}>
<DialogContent className="max-w-4xl max-h-[90vh] overflow-y-auto">
<DialogHeader>
<DialogTitle className="flex items-center gap-2">
<Sparkles className="w-5 h-5" />
: {selectedTemplate?.name}
</DialogTitle>
<DialogDescription>
使
</DialogDescription>
</DialogHeader>
<div className="grid grid-cols-2 gap-4">
{/* 左侧:输入 */}
<div className="space-y-4">
<div>
<Label></Label>
<Select
value={testForm.language}
onValueChange={v => {
setTestForm({
language: v,
code: TEST_CODE_SAMPLES[v] || TEST_CODE_SAMPLES.python,
});
}}
>
<SelectTrigger><SelectValue /></SelectTrigger>
<SelectContent>
<SelectItem value="python">Python</SelectItem>
<SelectItem value="javascript">JavaScript</SelectItem>
<SelectItem value="java">Java</SelectItem>
</SelectContent>
</Select>
</div>
<div>
<Label></Label>
<Textarea
value={testForm.code}
onChange={e => setTestForm({ ...testForm, code: e.target.value })}
rows={10}
className="font-mono text-sm"
/>
</div>
<Button onClick={handleTest} disabled={testing} className="w-full">
{testing ? (
<>
<Loader2 className="w-4 h-4 mr-2 animate-spin" />
...
</>
) : (
<>
<Play className="w-4 h-4 mr-2" />
</>
)}
</Button>
</div>
{/* 右侧:结果 */}
<div className="space-y-4">
<Label></Label>
<div className="border rounded-lg p-4 h-[400px] overflow-auto bg-muted">
{testResult ? (
testResult.success ? (
<div className="space-y-4">
<div className="flex items-center gap-2 text-green-600">
<Check className="w-5 h-5" />
<span> ( {testResult.execution_time}s)</span>
</div>
{testResult.result?.issues?.length > 0 ? (
<div className="space-y-2">
<div className="font-medium"> {testResult.result.issues.length} :</div>
{testResult.result.issues.map((issue: any, idx: number) => (
<div key={idx} className="p-2 bg-background rounded border">
<div className="flex items-center gap-2">
<Badge variant={
issue.severity === 'critical' ? 'destructive' :
issue.severity === 'high' ? 'destructive' :
issue.severity === 'medium' ? 'default' : 'secondary'
}>
{issue.severity}
</Badge>
<span className="font-medium">{issue.title}</span>
</div>
<p className="text-sm text-muted-foreground mt-1">{issue.description}</p>
{issue.line && (
<p className="text-xs text-muted-foreground"> {issue.line}</p>
)}
</div>
))}
</div>
) : (
<div className="text-muted-foreground"></div>
)}
{testResult.result?.quality_score !== undefined && (
<div className="text-sm">
: <span className="font-bold">{testResult.result.quality_score}</span>
</div>
)}
</div>
) : (
<div className="text-red-500">
<div className="font-medium"></div>
<div className="text-sm mt-1">{testResult.error}</div>
</div>
)
) : (
<div className="text-muted-foreground text-center py-8">
"运行测试"
</div>
)}
</div>
</div>
</div>
<DialogFooter>
<Button variant="outline" onClick={() => setShowTestDialog(false)}></Button>
</DialogFooter>
</DialogContent>
</Dialog>
</div>
);
}

View File

@ -0,0 +1,114 @@
/**
* API
*/
import { apiClient } from './serverClient';
export interface PromptTemplate {
id: string;
name: string;
description?: string;
template_type: string;
content_zh?: string;
content_en?: string;
variables: Record<string, string>;
is_default: boolean;
is_system: boolean;
is_active: boolean;
sort_order: number;
created_by?: string;
created_at?: string;
updated_at?: string;
}
export interface PromptTemplateListResponse {
items: PromptTemplate[];
total: number;
}
export interface PromptTemplateCreate {
name: string;
description?: string;
template_type?: string;
content_zh?: string;
content_en?: string;
variables?: Record<string, string>;
is_active?: boolean;
sort_order?: number;
}
export interface PromptTemplateUpdate {
name?: string;
description?: string;
template_type?: string;
content_zh?: string;
content_en?: string;
variables?: Record<string, string>;
is_active?: boolean;
sort_order?: number;
}
export interface PromptTestRequest {
content: string;
language: string;
code: string;
}
export interface PromptTestResponse {
success: boolean;
result?: Record<string, unknown>;
error?: string;
execution_time?: number;
}
// 获取提示词模板列表
export async function getPromptTemplates(params?: {
skip?: number;
limit?: number;
template_type?: string;
is_active?: boolean;
}): Promise<PromptTemplateListResponse> {
const searchParams = new URLSearchParams();
if (params?.skip !== undefined) searchParams.set('skip', String(params.skip));
if (params?.limit !== undefined) searchParams.set('limit', String(params.limit));
if (params?.template_type) searchParams.set('template_type', params.template_type);
if (params?.is_active !== undefined) searchParams.set('is_active', String(params.is_active));
const query = searchParams.toString();
const response = await apiClient.get(`/prompts${query ? `?${query}` : ''}`);
return response.data;
}
// 获取单个提示词模板
export async function getPromptTemplate(id: string): Promise<PromptTemplate> {
const response = await apiClient.get(`/prompts/${id}`);
return response.data;
}
// 创建提示词模板
export async function createPromptTemplate(data: PromptTemplateCreate): Promise<PromptTemplate> {
const response = await apiClient.post('/prompts', data);
return response.data;
}
// 更新提示词模板
export async function updatePromptTemplate(id: string, data: PromptTemplateUpdate): Promise<PromptTemplate> {
const response = await apiClient.put(`/prompts/${id}`, data);
return response.data;
}
// 删除提示词模板
export async function deletePromptTemplate(id: string): Promise<void> {
await apiClient.delete(`/prompts/${id}`);
}
// 测试提示词
export async function testPromptTemplate(data: PromptTestRequest): Promise<PromptTestResponse> {
const response = await apiClient.post('/prompts/test', data);
return response.data;
}
// 设置默认模板
export async function setDefaultPromptTemplate(id: string): Promise<void> {
await apiClient.post(`/prompts/${id}/set-default`);
}

View File

@ -0,0 +1,180 @@
/**
* API
*/
import { apiClient } from './serverClient';
export interface AuditRule {
id: string;
rule_set_id: string;
rule_code: string;
name: string;
description?: string;
category: string;
severity: string;
custom_prompt?: string;
fix_suggestion?: string;
reference_url?: string;
enabled: boolean;
sort_order: number;
created_at?: string;
updated_at?: string;
}
export interface AuditRuleSet {
id: string;
name: string;
description?: string;
language: string;
rule_type: string;
severity_weights: Record<string, number>;
is_default: boolean;
is_system: boolean;
is_active: boolean;
sort_order: number;
created_by?: string;
created_at?: string;
updated_at?: string;
rules: AuditRule[];
rules_count: number;
enabled_rules_count: number;
}
export interface AuditRuleSetListResponse {
items: AuditRuleSet[];
total: number;
}
export interface AuditRuleCreate {
rule_code: string;
name: string;
description?: string;
category: string;
severity?: string;
custom_prompt?: string;
fix_suggestion?: string;
reference_url?: string;
enabled?: boolean;
sort_order?: number;
}
export interface AuditRuleSetCreate {
name: string;
description?: string;
language?: string;
rule_type?: string;
severity_weights?: Record<string, number>;
is_active?: boolean;
sort_order?: number;
rules?: AuditRuleCreate[];
}
export interface AuditRuleSetUpdate {
name?: string;
description?: string;
language?: string;
rule_type?: string;
severity_weights?: Record<string, number>;
is_active?: boolean;
sort_order?: number;
}
export interface AuditRuleUpdate {
rule_code?: string;
name?: string;
description?: string;
category?: string;
severity?: string;
custom_prompt?: string;
fix_suggestion?: string;
reference_url?: string;
enabled?: boolean;
sort_order?: number;
}
// 获取规则集列表
export async function getRuleSets(params?: {
skip?: number;
limit?: number;
language?: string;
rule_type?: string;
is_active?: boolean;
}): Promise<AuditRuleSetListResponse> {
const searchParams = new URLSearchParams();
if (params?.skip !== undefined) searchParams.set('skip', String(params.skip));
if (params?.limit !== undefined) searchParams.set('limit', String(params.limit));
if (params?.language) searchParams.set('language', params.language);
if (params?.rule_type) searchParams.set('rule_type', params.rule_type);
if (params?.is_active !== undefined) searchParams.set('is_active', String(params.is_active));
const query = searchParams.toString();
const response = await apiClient.get(`/rules${query ? `?${query}` : ''}`);
return response.data;
}
// 获取单个规则集
export async function getRuleSet(id: string): Promise<AuditRuleSet> {
const response = await apiClient.get(`/rules/${id}`);
return response.data;
}
// 创建规则集
export async function createRuleSet(data: AuditRuleSetCreate): Promise<AuditRuleSet> {
const response = await apiClient.post('/rules', data);
return response.data;
}
// 更新规则集
export async function updateRuleSet(id: string, data: AuditRuleSetUpdate): Promise<AuditRuleSet> {
const response = await apiClient.put(`/rules/${id}`, data);
return response.data;
}
// 删除规则集
export async function deleteRuleSet(id: string): Promise<void> {
await apiClient.delete(`/rules/${id}`);
}
// 导出规则集
export async function exportRuleSet(id: string): Promise<Blob> {
const response = await apiClient.get(`/rules/${id}/export`, {
responseType: 'blob',
});
return response.data;
}
// 导入规则集
export async function importRuleSet(data: {
name: string;
description?: string;
language?: string;
rule_type?: string;
severity_weights?: Record<string, number>;
rules: AuditRuleCreate[];
}): Promise<AuditRuleSet> {
const response = await apiClient.post('/rules/import', data);
return response.data;
}
// 添加规则到规则集
export async function addRuleToSet(ruleSetId: string, data: AuditRuleCreate): Promise<AuditRule> {
const response = await apiClient.post(`/rules/${ruleSetId}/rules`, data);
return response.data;
}
// 更新规则
export async function updateRule(ruleSetId: string, ruleId: string, data: AuditRuleUpdate): Promise<AuditRule> {
const response = await apiClient.put(`/rules/${ruleSetId}/rules/${ruleId}`, data);
return response.data;
}
// 删除规则
export async function deleteRule(ruleSetId: string, ruleId: string): Promise<void> {
await apiClient.delete(`/rules/${ruleSetId}/rules/${ruleId}`);
}
// 切换规则启用状态
export async function toggleRule(ruleSetId: string, ruleId: string): Promise<{ enabled: boolean; message: string }> {
const response = await apiClient.put(`/rules/${ruleSetId}/rules/${ruleId}/toggle`);
return response.data;
}