diff --git a/backend/app/api/v1/endpoints/scan.py b/backend/app/api/v1/endpoints/scan.py index 24f64aa..c43e411 100644 --- a/backend/app/api/v1/endpoints/scan.py +++ b/backend/app/api/v1/endpoints/scan.py @@ -92,6 +92,7 @@ async def process_zip_task(task_id: str, file_path: str, db_session_factory, use # 获取分析配置(优先使用用户配置) analysis_config = get_analysis_config(user_config) max_analyze_files = analysis_config['max_analyze_files'] + analysis_concurrency = analysis_config['llm_concurrency'] llm_gap_ms = analysis_config['llm_gap_ms'] # 限制文件数量 @@ -116,42 +117,88 @@ async def process_zip_task(task_id: str, file_path: str, db_session_factory, use scanned_files = 0 failed_files = 0 - for file_info in files_to_scan: - # 检查是否取消 - if task_control.is_cancelled(task_id): - print(f"🛑 ZIP任务 {task_id} 已被取消") - task.status = "cancelled" - task.completed_at = datetime.now(timezone.utc) - await db.commit() - task_control.cleanup_task(task_id) - return + # 并行分析文件 + print(f"🧬 ZIP任务 {task_id}: 启动并行分析: {len(files_to_scan)} 个文件, 并发数: {analysis_concurrency}") + + semaphore = asyncio.Semaphore(analysis_concurrency) + last_error = None + + async def analyze_single_file(file_info): + """内部函数:分析单个文件并返回结果""" + nonlocal last_error + + async with semaphore: + if task_control.is_cancelled(task_id): + return None + + f_path = file_info['path'] + MAX_RETRIES = 3 + for attempt in range(MAX_RETRIES): + try: + content = file_info['content'] + language = get_language_from_path(f_path) + + # 获取规则集和提示词模板ID + scan_config = (user_config or {}).get('scan_config', {}) + rule_set_id = scan_config.get('rule_set_id') + prompt_template_id = scan_config.get('prompt_template_id') + + # 使用规则集和提示词模板进行分析 + if rule_set_id or prompt_template_id: + analysis_result = await llm_service.analyze_code_with_rules( + content, language, + rule_set_id=rule_set_id, + prompt_template_id=prompt_template_id, + db_session=None # 内部并行时不要传递 db_session 避免竞争 + ) + else: + analysis_result = await llm_service.analyze_code(content, language) + + return { + "type": "success", + "path": f_path, + "content": content, + "language": language, + "analysis": analysis_result + } + except Exception as e: + if attempt < MAX_RETRIES - 1: + wait_time = (attempt + 1) * 2 + print(f"⚠️ ZIP任务分析文件失败 ({f_path}), 正在进行第 {attempt+1} 次重试... 错误: {e}") + await asyncio.sleep(wait_time) + continue + else: + print(f"❌ ZIP任务分析文件最终失败 ({f_path}): {e}") + last_error = str(e) + return {"type": "error", "path": f_path, "error": str(e)} - try: - content = file_info['content'] + # 创建所有分析任务 + analysis_tasks = [analyze_single_file(f) for f in files_to_scan] + + # 使用 as_completed 处理结果 + for future in asyncio.as_completed(analysis_tasks): + if task_control.is_cancelled(task_id): + continue + + res = await future + if not res: continue + + if res["type"] == "error": + failed_files += 1 + elif res["type"] == "success": + scanned_files += 1 + + f_path = res["path"] + analysis = res["analysis"] + content = res["content"] total_lines += content.count('\n') + 1 - language = get_language_from_path(file_info['path']) - # 获取规则集和提示词模板ID - scan_config = (user_config or {}).get('scan_config', {}) - rule_set_id = scan_config.get('rule_set_id') - prompt_template_id = scan_config.get('prompt_template_id') - - # 使用规则集和提示词模板进行分析 - if rule_set_id or prompt_template_id: - result = await llm_service.analyze_code_with_rules( - content, language, - rule_set_id=rule_set_id, - prompt_template_id=prompt_template_id, - db_session=db - ) - else: - result = await llm_service.analyze_code(content, language) - - issues = result.get("issues", []) + # 保存问题 + issues = analysis.get("issues", []) for i in issues: issue = AuditIssue( task_id=task.id, - file_path=file_info['path'], + file_path=f_path, line_number=i.get('line', 1), column_number=i.get('column'), issue_type=i.get('type', 'maintainability'), @@ -167,24 +214,18 @@ async def process_zip_task(task_id: str, file_path: str, db_session_factory, use db.add(issue) total_issues += 1 - if "quality_score" in result: - quality_scores.append(result["quality_score"]) - - scanned_files += 1 - task.scanned_files = scanned_files - task.total_lines = total_lines - task.issues_count = total_issues - await db.commit() - - print(f"📈 ZIP任务 {task_id}: 进度 {scanned_files}/{len(files_to_scan)}") - - # 请求间隔 - await asyncio.sleep(llm_gap_ms / 1000) + if "quality_score" in analysis: + quality_scores.append(analysis["quality_score"]) - except Exception as file_error: - failed_files += 1 - print(f"❌ ZIP任务分析文件失败 ({file_info['path']}): {file_error}") - await asyncio.sleep(llm_gap_ms / 1000) + # 更新主任务进度 + task.scanned_files = scanned_files + task.total_lines = total_lines + task.issues_count = total_issues + await db.commit() + + processed_total = scanned_files + failed_files + if processed_total % 10 == 0 or processed_total == len(files_to_scan): + print(f"📈 ZIP任务 {task_id}: 进度 {processed_total}/{len(files_to_scan)}") # 完成任务 avg_quality_score = sum(quality_scores) / len(quality_scores) if quality_scores else 100.0 diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 9264657..4edf43b 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -41,7 +41,7 @@ class Settings(BaseSettings): LLM_API_KEY: Optional[str] = None LLM_MODEL: Optional[str] = None # 不指定时使用provider的默认模型 LLM_BASE_URL: Optional[str] = None # 自定义API端点(如中转站) - LLM_TIMEOUT: int = 150 # 超时时间(秒) + LLM_TIMEOUT: int = 300 # 超时时间(秒) LLM_TEMPERATURE: float = 0.1 LLM_MAX_TOKENS: int = 4096 @@ -74,8 +74,8 @@ class Settings(BaseSettings): # 扫描配置 MAX_ANALYZE_FILES: int = 0 # 最大分析文件数,0表示无限制 MAX_FILE_SIZE_BYTES: int = 200 * 1024 # 最大文件大小 200KB - LLM_CONCURRENCY: int = 3 # LLM并发数 - LLM_GAP_MS: int = 2000 # LLM请求间隔(毫秒) + LLM_CONCURRENCY: int = 10 # LLM并发数 + LLM_GAP_MS: int = 0 # LLM请求间隔(毫秒) # ZIP文件存储配置 ZIP_STORAGE_PATH: str = "./uploads/zip_files" # ZIP文件存储目录 diff --git a/backend/app/services/agent/agents/recon.py b/backend/app/services/agent/agents/recon.py index 1f0a53b..22d8ba4 100644 --- a/backend/app/services/agent/agents/recon.py +++ b/backend/app/services/agent/agents/recon.py @@ -554,7 +554,7 @@ Final Answer: [JSON格式的结果]""" await self.emit_llm_decision("继续思考", "LLM 需要更多信息") self._conversation_history.append({ "role": "user", - "content": "请继续。你输出了 Thought 但没有输出 Action。请**立即**选择一个工具执行(Action: ...),或者如果信息收集完成,输出 Final Answer。", + "content": "如果信息收集完成,请立即输出 Final Answer。如果信息未收集完成,现在你输出了 Thought 但没有输出 Action。请**立即**选择一个工具执行(Action: ...)", }) # 🔥 如果循环结束但没有 final_result,强制 LLM 总结 diff --git a/backend/app/services/agent/agents/verification.py b/backend/app/services/agent/agents/verification.py index cd6eed4..1a0be72 100644 --- a/backend/app/services/agent/agents/verification.py +++ b/backend/app/services/agent/agents/verification.py @@ -818,7 +818,7 @@ class VerificationAgent(BaseAgent): await self.emit_llm_decision("继续验证", "LLM 需要更多验证") self._conversation_history.append({ "role": "user", - "content": "请继续验证。你输出了 Thought 但没有输出 Action。请**立即**选择一个工具执行,或者如果验证完成,输出 Final Answer 汇总所有验证结果。", + "content": "如果验证完成,请立即输出 Final Answer 汇总所有验证结果。如果验证没有完成:你现在输出了 Thought 但没有输出 Action。请**立即**选择一个工具执行", }) # 处理结果 diff --git a/backend/app/services/scanner.py b/backend/app/services/scanner.py index 9eb23a7..d206d93 100644 --- a/backend/app/services/scanner.py +++ b/backend/app/services/scanner.py @@ -531,6 +531,7 @@ async def scan_repo_task(task_id: str, db_session_factory, user_config: dict = N # 获取分析配置(优先使用用户配置) analysis_config = get_analysis_config(user_config) max_analyze_files = analysis_config['max_analyze_files'] + analysis_concurrency = analysis_config['llm_concurrency'] # 并发数 llm_gap_ms = analysis_config['llm_gap_ms'] # 限制文件数量 @@ -558,109 +559,114 @@ async def scan_repo_task(task_id: str, db_session_factory, user_config: dict = N MAX_CONSECUTIVE_FAILURES = 5 last_error = None - for file_info in files: - # 检查是否取消 - if task_control.is_cancelled(task_id): - print(f"🛑 任务 {task_id} 已被用户取消") - task.status = "cancelled" - task.completed_at = datetime.now(timezone.utc) - await db.commit() - task_control.cleanup_task(task_id) - return - - # 检查连续失败次数 - if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: - print(f"❌ 任务 {task_id}: 连续失败 {consecutive_failures} 次,停止分析") - raise Exception(f"连续失败 {consecutive_failures} 次,可能是 LLM API 服务异常") - - try: - # 获取文件内容 - - if is_ssh_url: - # SSH方式已经包含了文件内容 - content = file_info.get('content', '') - if not content: - print(f"⚠️ SSH文件内容为空: {file_info['path']}") - print(f"📥 正在处理SSH文件: {file_info['path']}") - else: - headers = {} - # 使用提取的 token 或用户配置的 token - - if repo_type == "gitlab": - token_to_use = file_info.get('token') or gitlab_token - if token_to_use: - headers["PRIVATE-TOKEN"] = token_to_use - elif repo_type == "gitea": - token_to_use = file_info.get('token') or gitea_token - if token_to_use: - headers["Authorization"] = f"token {token_to_use}" - elif repo_type == "github": - # GitHub raw URL 也是直接下载,通常public不需要token,private需要 - # GitHub raw user content url: raw.githubusercontent.com - if github_token: - headers["Authorization"] = f"Bearer {github_token}" - - print(f"📥 正在获取文件: {file_info['path']}") - content = await fetch_file_content(file_info["url"], headers) - - if not content or not content.strip(): - print(f"⚠️ 文件内容为空,跳过: {file_info['path']}") - skipped_files += 1 - # 更新总文件数,因为该文件被跳过,不应计入总数 - task.total_files = max(0, task.total_files - 1) - await db.commit() - continue - - if len(content) > settings.MAX_FILE_SIZE_BYTES: - print(f"⚠️ 文件太大,跳过: {file_info['path']}") - skipped_files += 1 - # 更新总文件数,因为该文件被跳过 - task.total_files = max(0, task.total_files - 1) - await db.commit() - continue - - file_lines = content.split('\n') - total_lines = len(file_lines) + 1 - language = get_language_from_path(file_info["path"]) - - print(f"🤖 正在调用 LLM 分析: {file_info['path']} ({language}, {len(content)} bytes)") - # LLM分析 - 支持规则集和提示词模板 - scan_config = (user_config or {}).get('scan_config', {}) - rule_set_id = scan_config.get('rule_set_id') - prompt_template_id = scan_config.get('prompt_template_id') - - if rule_set_id or prompt_template_id: - analysis = await llm_service.analyze_code_with_rules( - content, language, - rule_set_id=rule_set_id, - prompt_template_id=prompt_template_id, - db_session=db - ) - else: - analysis = await llm_service.analyze_code(content, language) - print(f"✅ LLM 分析完成: {file_info['path']}") - - # 再次检查是否取消(LLM分析后) + # 4. 并行分析文件 + print(f"🧬 启动并行分析: {len(files)} 个文件, 并发数: {analysis_concurrency}") + + semaphore = asyncio.Semaphore(analysis_concurrency) + + async def analyze_single_file(file_info): + """内部函数:分析单个文件并返回结果""" + nonlocal consecutive_failures, last_error + + async with semaphore: if task_control.is_cancelled(task_id): - print(f"🛑 任务 {task_id} 在LLM分析后被取消") - task.status = "cancelled" - task.completed_at = datetime.now(timezone.utc) - await db.commit() - task_control.cleanup_task(task_id) - return + return None + + f_path = file_info['path'] + MAX_RETRIES = 3 + for attempt in range(MAX_RETRIES): + try: + # 4.1 获取文件内容 (仅在第一次尝试或内容获取失败时获取) + if attempt == 0: + if is_ssh_url: + content = file_info.get('content', '') + else: + headers = {} + if repo_type == "gitlab": + token_to_use = file_info.get('token') or gitlab_token + if token_to_use: headers["PRIVATE-TOKEN"] = token_to_use + elif repo_type == "gitea": + token_to_use = file_info.get('token') or gitea_token + if token_to_use: headers["Authorization"] = f"token {token_to_use}" + elif repo_type == "github" and github_token: + headers["Authorization"] = f"Bearer {github_token}" + + content = await fetch_file_content(file_info["url"], headers) + + if not content or not content.strip(): + return {"type": "skip", "reason": "empty", "path": f_path} + + if len(content) > settings.MAX_FILE_SIZE_BYTES: + return {"type": "skip", "reason": "too_large", "path": f_path} + + # 4.2 LLM 分析 + language = get_language_from_path(f_path) + scan_config = (user_config or {}).get('scan_config', {}) + rule_set_id = scan_config.get('rule_set_id') + prompt_template_id = scan_config.get('prompt_template_id') + + if rule_set_id or prompt_template_id: + analysis_result = await llm_service.analyze_code_with_rules( + content, language, + rule_set_id=rule_set_id, + prompt_template_id=prompt_template_id, + db_session=None + ) + else: + analysis_result = await llm_service.analyze_code(content, language) + + return { + "type": "success", + "path": f_path, + "content": content, + "language": language, + "analysis": analysis_result + } + except Exception as e: + if attempt < MAX_RETRIES - 1: + wait_time = (attempt + 1) * 2 + print(f"⚠️ 分析文件失败 ({f_path}), 正在进行第 {attempt+1} 次重试... 错误: {e}") + await asyncio.sleep(wait_time) + continue + else: + print(f"❌ 分析文件最终失败 ({f_path}): {e}") + last_error = str(e) + return {"type": "error", "path": f_path, "error": str(e)} + + # 创建所有分析任务 + analysis_tasks = [analyze_single_file(f) for f in files] + + # 使用 as_completed 处理结果,这样可以实时更新进度且安全使用当前 db session + for future in asyncio.as_completed(analysis_tasks): + if task_control.is_cancelled(task_id): + # 停止处理后续完成的任务 + continue + + res = await future + if not res: continue + + if res["type"] == "skip": + skipped_files += 1 + task.total_files = max(0, task.total_files - 1) + elif res["type"] == "error": + failed_files += 1 + consecutive_failures += 1 + elif res["type"] == "success": + consecutive_failures = 0 + scanned_files += 1 + + f_path = res["path"] + analysis = res["analysis"] + file_lines = res["content"].split('\n') + total_lines += len(file_lines) # 保存问题 issues = analysis.get("issues", []) for issue in issues: line_num = issue.get("line", 1) - - # 健壮的代码片段提取逻辑 - # 优先使用 LLM 返回的片段,如果为空则从源码提取 code_snippet = issue.get("code_snippet") if not code_snippet or len(code_snippet.strip()) < 5: - # 从源码提取上下文 (前后2行) try: - # line_num 是 1-based idx = max(0, int(line_num) - 1) start = max(0, idx - 2) end = min(len(file_lines), idx + 3) @@ -670,7 +676,7 @@ async def scan_repo_task(task_id: str, db_session_factory, user_config: dict = N audit_issue = AuditIssue( task_id=task.id, - file_path=file_info["path"], + file_path=f_path, line_number=line_num, column_number=issue.get("column"), issue_type=issue.get("type", "maintainability"), @@ -687,35 +693,20 @@ async def scan_repo_task(task_id: str, db_session_factory, user_config: dict = N if "quality_score" in analysis: quality_scores.append(analysis["quality_score"]) - - consecutive_failures = 0 # 成功后重置 - scanned_files += 1 - - # 更新进度(成功 + 失败) - processed_count = scanned_files + failed_files - task.scanned_files = processed_count - task.total_lines = total_lines - task.issues_count = total_issues - await db.commit() - - print(f"📈 任务 {task_id}: 进度 {processed_count}/{task.total_files} ({int(processed_count/task.total_files*100) if task.total_files > 0 else 0}%)") - - # 请求间隔 - await asyncio.sleep(llm_gap_ms / 1000) - - except Exception as file_error: - failed_files += 1 - consecutive_failures += 1 - # 失败也更新进度 - task.scanned_files = scanned_files + failed_files - await db.commit() - - # 打印详细错误信息 - import traceback - print(f"❌ 分析文件失败 ({file_info['path']}): {file_error}") - print(f" 错误类型: {type(file_error).__name__}") - print(f" 详细信息: {traceback.format_exc()}") - await asyncio.sleep(llm_gap_ms / 1000) + + # 更新主任务进度 + processed_count = scanned_files + failed_files + task.scanned_files = processed_count + task.total_lines = total_lines + task.issues_count = total_issues + await db.commit() # 这里的 commit 是在一个协程里按序进行的,是安全的 + + if processed_count % 10 == 0 or processed_count == len(files): + print(f"📈 任务 {task_id}: 进度 {processed_count}/{len(files)} ({int(processed_count/len(files)*100) if len(files) > 0 else 0}%)") + + if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: + print(f"❌ 任务 {task_id}: 连续失败 {consecutive_failures} 次,停止分析") + break # 5. 完成任务 avg_quality_score = sum(quality_scores) / len(quality_scores) if quality_scores else 100.0