diff --git a/backend/app/api/v1/api.py b/backend/app/api/v1/api.py index c233682..285a3aa 100644 --- a/backend/app/api/v1/api.py +++ b/backend/app/api/v1/api.py @@ -1,5 +1,5 @@ from fastapi import APIRouter -from app.api.v1.endpoints import auth, users, projects, tasks, scan, members, config, database, prompts, rules, agent_tasks, embedding_config +from app.api.v1.endpoints import auth, users, projects, tasks, scan, members, config, database, prompts, rules, agent_tasks, embedding_config, ssh_keys api_router = APIRouter() api_router.include_router(auth.router, prefix="/auth", tags=["auth"]) @@ -14,3 +14,4 @@ api_router.include_router(prompts.router, prefix="/prompts", tags=["prompts"]) api_router.include_router(rules.router, prefix="/rules", tags=["rules"]) api_router.include_router(agent_tasks.router, prefix="/agent-tasks", tags=["agent-tasks"]) api_router.include_router(embedding_config.router, prefix="/embedding", tags=["embedding"]) +api_router.include_router(ssh_keys.router, prefix="/ssh-keys", tags=["ssh-keys"]) diff --git a/backend/app/api/v1/endpoints/agent_tasks.py b/backend/app/api/v1/endpoints/agent_tasks.py index 126b67d..b68b478 100644 --- a/backend/app/api/v1/endpoints/agent_tasks.py +++ b/backend/app/api/v1/endpoints/agent_tasks.py @@ -32,6 +32,8 @@ from app.models.user import User from app.models.user_config import UserConfig from app.services.agent.event_manager import EventManager from app.services.agent.streaming import StreamHandler, StreamEvent, StreamEventType +from app.services.git_ssh_service import GitSSHOperations +from app.core.encryption import decrypt_sensitive_data logger = logging.getLogger(__name__) router = APIRouter() @@ -288,13 +290,23 @@ async def _execute_agent_task(task_id: str): # 获取用户配置(需要在获取项目根目录之前,以便传递 token) user_config = await _get_user_config(db, task.created_by) - # 从用户配置中提取 token(用于私有仓库克隆) + # 从用户配置中提取 token和SSH密钥(用于私有仓库克隆) other_config = (user_config or {}).get('otherConfig', {}) github_token = other_config.get('githubToken') or settings.GITHUB_TOKEN gitlab_token = other_config.get('gitlabToken') or settings.GITLAB_TOKEN gitea_token = other_config.get('giteaToken') or settings.GITEA_TOKEN - # 获取项目根目录(传递任务指定的分支和认证 token) + # 解密SSH私钥 + ssh_private_key = None + if 'sshPrivateKey' in other_config: + try: + encrypted_key = other_config['sshPrivateKey'] + ssh_private_key = decrypt_sensitive_data(encrypted_key) + logger.info("成功解密SSH私钥") + except Exception as e: + logger.warning(f"解密SSH私钥失败: {e}") + + # 获取项目根目录(传递任务指定的分支和认证 token/SSH密钥) # 🔥 传递 event_emitter 以发送克隆进度 project_root = await _get_project_root( project, @@ -303,6 +315,7 @@ async def _execute_agent_task(task_id: str): github_token=github_token, gitlab_token=gitlab_token, gitea_token=gitea_token, # 🔥 新增 + ssh_private_key=ssh_private_key, # 🔥 新增SSH密钥 event_emitter=event_emitter, # 🔥 新增 ) @@ -2216,6 +2229,7 @@ async def _get_project_root( github_token: Optional[str] = None, gitlab_token: Optional[str] = None, gitea_token: Optional[str] = None, # 🔥 新增 + ssh_private_key: Optional[str] = None, # 🔥 新增:SSH私钥(用于SSH认证) event_emitter: Optional[Any] = None, # 🔥 新增:用于发送实时日志 ) -> str: """ @@ -2232,6 +2246,7 @@ async def _get_project_root( github_token: GitHub 访问令牌(用于私有仓库) gitlab_token: GitLab 访问令牌(用于私有仓库) gitea_token: Gitea 访问令牌(用于私有仓库) + ssh_private_key: SSH私钥(用于SSH认证) event_emitter: 事件发送器(用于发送实时日志) Returns: @@ -2307,6 +2322,9 @@ async def _get_project_root( await emit(f"🔄 正在获取仓库: {repo_url}") + # 检测是否为SSH URL(SSH链接不支持ZIP下载) + is_ssh_url = GitSSHOperations.is_ssh_url(repo_url) + # 解析仓库 URL 获取 owner/repo parsed = urlparse(repo_url) path_parts = parsed.path.strip('/').replace('.git', '').split('/') @@ -2329,7 +2347,12 @@ async def _get_project_root( last_error = "" # ============ 方案1: 优先使用 ZIP 下载(更快更稳定)============ - if owner and repo: + # SSH链接直接跳过ZIP下载,使用git clone + if is_ssh_url: + logger.info(f"检测到SSH URL,跳过ZIP下载,直接使用Git克隆") + await emit(f"🔑 检测到SSH认证,使用Git克隆...") + + if owner and repo and not is_ssh_url: import httpx for branch in branches_to_try: @@ -2437,8 +2460,12 @@ async def _get_project_root( # ============ 方案2: 回退到 git clone ============ if not download_success: - await emit(f"🔄 ZIP 下载失败,回退到 Git 克隆...") - logger.info("ZIP download failed, falling back to git clone") + if is_ssh_url: + # SSH链接直接使用git clone,不是"失败" + pass # 已在上面输出提示 + else: + await emit(f"🔄 ZIP 下载失败,回退到 Git 克隆...") + logger.info("ZIP download failed, falling back to git clone") # 检查 git 是否可用 try: @@ -2490,7 +2517,9 @@ async def _get_project_root( parsed.fragment )) await emit(f"🔐 使用 Gitea Token 认证") - + elif is_ssh_url and ssh_private_key: + await emit(f"🔐 使用 SSH Key 认证") + for branch in branches_to_try: check_cancelled() @@ -2502,36 +2531,68 @@ async def _get_project_root( await emit(f"🔄 尝试克隆分支: {branch}") try: - async def run_clone(): - return await asyncio.to_thread( - subprocess.run, - ["git", "clone", "--depth", "1", "--branch", branch, auth_url, base_path], - capture_output=True, - text=True, - timeout=120, - ) + # SSH URL使用GitSSHOperations(支持SSH密钥认证) + if is_ssh_url and ssh_private_key: + async def run_ssh_clone(): + return await asyncio.to_thread( + GitSSHOperations.clone_repo_with_ssh, + repo_url, ssh_private_key, base_path, branch + ) - clone_task = asyncio.create_task(run_clone()) - while not clone_task.done(): - check_cancelled() - try: - result = await asyncio.wait_for(asyncio.shield(clone_task), timeout=1.0) + clone_task = asyncio.create_task(run_ssh_clone()) + while not clone_task.done(): + check_cancelled() + try: + result = await asyncio.wait_for(asyncio.shield(clone_task), timeout=1.0) + break + except asyncio.TimeoutError: + continue + + if clone_task.done(): + result = clone_task.result() + + # GitSSHOperations返回字典格式 + if result.get('success'): + logger.info(f"✅ Git 克隆成功 (SSH, 分支: {branch})") + await emit(f"✅ 仓库获取成功 (SSH克隆, 分支: {branch})") + download_success = True break - except asyncio.TimeoutError: - continue - - if clone_task.done(): - result = clone_task.result() - - if result.returncode == 0: - logger.info(f"✅ Git 克隆成功 (分支: {branch})") - await emit(f"✅ 仓库获取成功 (Git克隆, 分支: {branch})") - download_success = True - break + else: + last_error = result.get('message', '未知错误') + logger.warning(f"SSH克隆失败 (分支 {branch}): {last_error[:200]}") + await emit(f"⚠️ 分支 {branch} SSH克隆失败...", "warning") else: - last_error = result.stderr - logger.warning(f"克隆失败 (分支 {branch}): {last_error[:200]}") - await emit(f"⚠️ 分支 {branch} 克隆失败...", "warning") + # HTTPS URL使用标准git clone + async def run_clone(): + return await asyncio.to_thread( + subprocess.run, + ["git", "clone", "--depth", "1", "--branch", branch, auth_url, base_path], + capture_output=True, + text=True, + timeout=120, + ) + + clone_task = asyncio.create_task(run_clone()) + while not clone_task.done(): + check_cancelled() + try: + result = await asyncio.wait_for(asyncio.shield(clone_task), timeout=1.0) + break + except asyncio.TimeoutError: + continue + + if clone_task.done(): + result = clone_task.result() + + if result.returncode == 0: + logger.info(f"✅ Git 克隆成功 (分支: {branch})") + await emit(f"✅ 仓库获取成功 (Git克隆, 分支: {branch})") + download_success = True + break + else: + last_error = result.stderr + logger.warning(f"克隆失败 (分支 {branch}): {last_error[:200]}") + await emit(f"⚠️ 分支 {branch} 克隆失败...", "warning") except subprocess.TimeoutExpired: last_error = f"克隆分支 {branch} 超时" logger.warning(last_error) @@ -2550,33 +2611,61 @@ async def _get_project_root( os.makedirs(base_path, exist_ok=True) try: - async def run_default_clone(): - return await asyncio.to_thread( - subprocess.run, - ["git", "clone", "--depth", "1", auth_url, base_path], - capture_output=True, - text=True, - timeout=120, - ) + # SSH URL使用GitSSHOperations(不指定分支) + if is_ssh_url and ssh_private_key: + async def run_default_ssh_clone(): + return await asyncio.to_thread( + GitSSHOperations.clone_repo_with_ssh, + repo_url, ssh_private_key, base_path, branch + ) - clone_task = asyncio.create_task(run_default_clone()) - while not clone_task.done(): - check_cancelled() - try: - result = await asyncio.wait_for(asyncio.shield(clone_task), timeout=1.0) - break - except asyncio.TimeoutError: - continue + clone_task = asyncio.create_task(run_default_ssh_clone()) + while not clone_task.done(): + check_cancelled() + try: + result = await asyncio.wait_for(asyncio.shield(clone_task), timeout=1.0) + break + except asyncio.TimeoutError: + continue - if clone_task.done(): - result = clone_task.result() + if clone_task.done(): + result = clone_task.result() - if result.returncode == 0: - logger.info(f"✅ Git 克隆成功 (默认分支)") - await emit(f"✅ 仓库获取成功 (Git克隆, 默认分支)") - download_success = True + if result.get('success'): + logger.info(f"✅ Git 克隆成功 (SSH, 默认分支)") + await emit(f"✅ 仓库获取成功 (SSH克隆, 默认分支)") + download_success = True + else: + last_error = result.get('message', '未知错误') else: - last_error = result.stderr + # HTTPS URL使用标准git clone + async def run_default_clone(): + return await asyncio.to_thread( + subprocess.run, + ["git", "clone", "--depth", "1", auth_url, base_path], + capture_output=True, + text=True, + timeout=120, + ) + + clone_task = asyncio.create_task(run_default_clone()) + while not clone_task.done(): + check_cancelled() + try: + result = await asyncio.wait_for(asyncio.shield(clone_task), timeout=1.0) + break + except asyncio.TimeoutError: + continue + + if clone_task.done(): + result = clone_task.result() + + if result.returncode == 0: + logger.info(f"✅ Git 克隆成功 (默认分支)") + await emit(f"✅ 仓库获取成功 (Git克隆, 默认分支)") + download_success = True + else: + last_error = result.stderr except subprocess.TimeoutExpired: last_error = "克隆超时" except asyncio.CancelledError: diff --git a/backend/app/api/v1/endpoints/projects.py b/backend/app/api/v1/endpoints/projects.py index cca092a..e26143a 100644 --- a/backend/app/api/v1/endpoints/projects.py +++ b/backend/app/api/v1/endpoints/projects.py @@ -404,22 +404,24 @@ async def get_project_files( # Handle Repository project if not project.repository_url: return [] - + # Get tokens from user config from sqlalchemy.future import select from app.core.encryption import decrypt_sensitive_data from app.core.config import settings + from app.services.git_ssh_service import GitSSHOperations + + SENSITIVE_OTHER_FIELDS = ['githubToken', 'gitlabToken', 'sshPrivateKey'] - SENSITIVE_OTHER_FIELDS = ['githubToken', 'gitlabToken'] - result = await db.execute( select(UserConfig).where(UserConfig.user_id == current_user.id) ) config = result.scalar_one_or_none() - + github_token = settings.GITHUB_TOKEN gitlab_token = settings.GITLAB_TOKEN - + ssh_private_key = None + if config and config.other_config: other_config = json.loads(config.other_config) for field in SENSITIVE_OTHER_FIELDS: @@ -429,20 +431,46 @@ async def get_project_files( github_token = decrypted_val elif field == 'gitlabToken': gitlab_token = decrypted_val + elif field == 'sshPrivateKey': + ssh_private_key = decrypted_val - repo_type = project.repository_type or "other" - # 使用传入的 branch 参数,如果没有则使用项目默认分支 + # 检查是否为SSH URL + is_ssh_url = GitSSHOperations.is_ssh_url(project.repository_url) target_branch = branch or project.default_branch or "main" - + try: - if repo_type == "github": - # 传入用户自定义排除模式 - repo_files = await get_github_files(project.repository_url, target_branch, github_token, parsed_exclude_patterns) - files = [{"path": f["path"], "size": 0} for f in repo_files] - elif repo_type == "gitlab": - # 传入用户自定义排除模式 - repo_files = await get_gitlab_files(project.repository_url, target_branch, gitlab_token, parsed_exclude_patterns) - files = [{"path": f["path"], "size": 0} for f in repo_files] + if is_ssh_url: + # 使用SSH方式获取文件列表 + if not ssh_private_key: + raise HTTPException( + status_code=400, + detail="仓库使用SSH URL,但未配置SSH密钥。请先在设置中生成SSH密钥。" + ) + + print(f"🔐 使用SSH方式获取文件列表: {project.repository_url}") + files_with_content = GitSSHOperations.get_repo_files_via_ssh( + project.repository_url, + ssh_private_key, + target_branch, + parsed_exclude_patterns + ) + files = [{"path": f["path"], "size": len(f.get("content", ""))} for f in files_with_content] + else: + # 使用API方式获取文件列表 + repo_type = project.repository_type or "other" + + if repo_type == "github": + # 传入用户自定义排除模式 + repo_files = await get_github_files(project.repository_url, target_branch, github_token, parsed_exclude_patterns) + files = [{"path": f["path"], "size": 0} for f in repo_files] + elif repo_type == "gitlab": + # 传入用户自定义排除模式 + repo_files = await get_gitlab_files(project.repository_url, target_branch, gitlab_token, parsed_exclude_patterns) + files = [{"path": f["path"], "size": 0} for f in repo_files] + else: + raise HTTPException(status_code=400, detail="不支持的仓库类型") + except HTTPException: + raise except Exception as e: print(f"Error fetching repo files: {e}") raise HTTPException(status_code=500, detail=f"无法获取仓库文件: {str(e)}") diff --git a/backend/app/api/v1/endpoints/ssh_keys.py b/backend/app/api/v1/endpoints/ssh_keys.py new file mode 100644 index 0000000..092e2e2 --- /dev/null +++ b/backend/app/api/v1/endpoints/ssh_keys.py @@ -0,0 +1,254 @@ +""" +SSH密钥管理API端点 +""" + +from typing import Any, Optional +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select +from pydantic import BaseModel +import json + +from app.api import deps +from app.db.session import get_db +from app.models.user import User +from app.models.user_config import UserConfig +from app.services.git_ssh_service import SSHKeyService, GitSSHOperations, clear_known_hosts +from app.core.encryption import encrypt_sensitive_data, decrypt_sensitive_data + +router = APIRouter() + + +# Schemas +class SSHKeyGenerateResponse(BaseModel): + public_key: str + message: str + + +class SSHKeyResponse(BaseModel): + has_key: bool + public_key: Optional[str] = None + fingerprint: Optional[str] = None + + +class SSHKeyTestRequest(BaseModel): + repo_url: str + + +class SSHKeyTestResponse(BaseModel): + success: bool + message: str + output: Optional[str] = None + + +@router.post("/generate", response_model=SSHKeyGenerateResponse) +async def generate_ssh_key( + *, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(deps.get_current_user), +) -> Any: + """ + 生成新的SSH密钥对 + + 生成RSA 4096格式的SSH密钥对,私钥加密存储在用户配置中,公钥返回给用户 + """ + try: + # 生成SSH密钥对 (RSA 4096) + private_key, public_key = SSHKeyService.generate_rsa_key(key_size=4096) + + # 获取或创建用户配置 + result = await db.execute( + select(UserConfig).where(UserConfig.user_id == current_user.id) + ) + user_config = result.scalar_one_or_none() + + if not user_config: + user_config = UserConfig( + user_id=current_user.id, + llm_config="{}", + other_config="{}" + ) + db.add(user_config) + + # 解析现有的other_config + other_config = json.loads(user_config.other_config) if user_config.other_config else {} + + # 加密并存储私钥 + encrypted_private_key = encrypt_sensitive_data(private_key) + other_config['sshPrivateKey'] = encrypted_private_key + other_config['sshPublicKey'] = public_key # 公钥不需要加密 + + # 更新配置 + user_config.other_config = json.dumps(other_config) + + await db.commit() + + # 计算公钥指纹 + fingerprint = SSHKeyService.get_public_key_fingerprint(public_key) + + return { + "public_key": public_key, + "fingerprint": fingerprint, + "message": "SSH密钥生成成功,请将公钥添加到您的GitHub/GitLab账户" + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"生成SSH密钥失败: {str(e)}") + + +@router.get("/", response_model=SSHKeyResponse) +async def get_ssh_key( + db: AsyncSession = Depends(get_db), + current_user: User = Depends(deps.get_current_user), +) -> Any: + """ + 获取当前用户的SSH公钥 + """ + try: + # 获取用户配置 + result = await db.execute( + select(UserConfig).where(UserConfig.user_id == current_user.id) + ) + user_config = result.scalar_one_or_none() + + if not user_config or not user_config.other_config: + return {"has_key": False} + + # 解析配置 + other_config = json.loads(user_config.other_config) + + if 'sshPublicKey' in other_config: + public_key = other_config['sshPublicKey'] + fingerprint = SSHKeyService.get_public_key_fingerprint(public_key) + + return { + "has_key": True, + "public_key": public_key, + "fingerprint": fingerprint + } + else: + return {"has_key": False} + + except Exception as e: + raise HTTPException(status_code=500, detail=f"获取SSH密钥失败: {str(e)}") + + +@router.delete("/") +async def delete_ssh_key( + db: AsyncSession = Depends(get_db), + current_user: User = Depends(deps.get_current_user), +) -> Any: + """ + 删除当前用户的SSH密钥 + """ + try: + # 获取用户配置 + result = await db.execute( + select(UserConfig).where(UserConfig.user_id == current_user.id) + ) + user_config = result.scalar_one_or_none() + + if not user_config or not user_config.other_config: + raise HTTPException(status_code=404, detail="未找到SSH密钥") + + # 解析配置 + other_config = json.loads(user_config.other_config) + + # 删除SSH密钥 + if 'sshPrivateKey' in other_config: + del other_config['sshPrivateKey'] + if 'sshPublicKey' in other_config: + del other_config['sshPublicKey'] + + # 更新配置 + user_config.other_config = json.dumps(other_config) + await db.commit() + + return {"message": "SSH密钥已删除"} + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"删除SSH密钥失败: {str(e)}") + + +@router.post("/test", response_model=SSHKeyTestResponse) +async def test_ssh_key( + *, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(deps.get_current_user), + test_request: SSHKeyTestRequest, +) -> Any: + """ + 测试SSH密钥是否有效 + + Args: + test_request: 包含repo_url的测试请求 + """ + try: + # 获取用户配置 + result = await db.execute( + select(UserConfig).where(UserConfig.user_id == current_user.id) + ) + user_config = result.scalar_one_or_none() + + if not user_config or not user_config.other_config: + raise HTTPException(status_code=404, detail="未找到SSH密钥,请先生成SSH密钥") + + # 解析配置 + other_config = json.loads(user_config.other_config) + + if 'sshPrivateKey' not in other_config: + raise HTTPException(status_code=404, detail="未找到SSH密钥,请先生成SSH密钥") + + # 解密私钥 + private_key = decrypt_sensitive_data(other_config['sshPrivateKey']) + public_key = other_config.get('sshPublicKey', '') + + # 验证密钥对是否匹配 + is_valid = SSHKeyService.verify_key_pair(private_key, public_key) + print(f"[SSH Test API] Key pair valid: {is_valid}") + + if not is_valid: + return { + "success": False, + "message": "密钥对验证失败:私钥和公钥不匹配", + "output": "请重新生成SSH密钥" + } + + # 测试SSH连接 + result = GitSSHOperations.test_ssh_key(test_request.repo_url, private_key) + + return result + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"测试SSH密钥失败: {str(e)}") + + +@router.delete("/known-hosts") +async def clear_known_hosts_file( + current_user: User = Depends(deps.get_current_user), +) -> Any: + """ + 清理known_hosts文件 + + 清空SSH known_hosts文件中保存的所有主机密钥。 + 下次连接时会重新接受并保存新的host key。 + """ + try: + success = clear_known_hosts() + + if success: + return { + "success": True, + "message": "known_hosts文件已清理,下次连接时会重新保存主机密钥" + } + else: + raise HTTPException(status_code=500, detail="清理known_hosts文件失败") + + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"清理失败: {str(e)}") diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 061bfeb..47cd6d8 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -90,6 +90,9 @@ class Settings(BaseSettings): # 向量数据库配置 VECTOR_DB_PATH: str = "./data/vector_db" # 向量数据库持久化目录 + + # SSH配置 + SSH_CONFIG_PATH: str = "./data/ssh" # SSH配置目录(存储known_hosts等) # Agent 配置 AGENT_MAX_ITERATIONS: int = 50 # Agent 最大迭代次数 diff --git a/backend/app/services/git_ssh_service.py b/backend/app/services/git_ssh_service.py new file mode 100644 index 0000000..4d7f9fa --- /dev/null +++ b/backend/app/services/git_ssh_service.py @@ -0,0 +1,573 @@ +""" +Git SSH服务 - 生成SSH密钥并使用SSH方式访问Git仓库 +""" + +import os +import sys +import tempfile +import subprocess +import shutil +import hashlib +import base64 +from typing import Tuple, Optional, Dict, List +from pathlib import Path +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ed25519, rsa +from cryptography.hazmat.backends import default_backend + + +def get_ssh_config_dir() -> str: + """ + 获取SSH配置目录路径,如果不存在则创建 + + Returns: + SSH配置目录的绝对路径 + """ + from app.core.config import settings + + ssh_config_path = Path(settings.SSH_CONFIG_PATH) + + # 确保目录存在 + ssh_config_path.mkdir(parents=True, exist_ok=True) + + # 设置目录权限(仅所有者可访问) + if sys.platform != 'win32': + os.chmod(ssh_config_path, 0o700) + + return str(ssh_config_path.absolute()) + + +def get_known_hosts_file() -> str: + """ + 获取known_hosts文件路径,如果不存在则创建 + + Returns: + known_hosts文件的绝对路径 + """ + ssh_config_dir = get_ssh_config_dir() + known_hosts_file = Path(ssh_config_dir) / 'known_hosts' + + # 如果文件不存在则创建 + if not known_hosts_file.exists(): + known_hosts_file.touch() + # 设置文件权限 + if sys.platform != 'win32': + os.chmod(known_hosts_file, 0o600) + + return str(known_hosts_file.absolute()) + + +def clear_known_hosts() -> bool: + """ + 清理known_hosts文件内容 + + Returns: + 是否清理成功 + """ + try: + known_hosts_file = get_known_hosts_file() + # 清空文件内容 + with open(known_hosts_file, 'w') as f: + f.write('') + print(f"[SSH] Cleared known_hosts file: {known_hosts_file}") + return True + except Exception as e: + print(f"[SSH] Failed to clear known_hosts: {e}") + return False + + +def set_secure_file_permissions(file_path: str): + """ + 设置文件的安全权限(Unix: 0600, Windows: 只有当前用户可读写) + + Args: + file_path: 文件路径 + """ + if sys.platform == 'win32': + # Windows系统使用icacls命令设置权限 + try: + # 移除所有继承的权限 + subprocess.run( + ['icacls', file_path, '/inheritance:r'], + capture_output=True, + check=True + ) + # 只给当前用户完全控制权限 + subprocess.run( + ['icacls', file_path, '/grant:r', f'{os.environ.get("USERNAME")}:(F)'], + capture_output=True, + check=True + ) + except Exception as e: + print(f"Warning: Failed to set Windows file permissions: {e}") + # 尝试使用os.chmod作为后备方案 + try: + os.chmod(file_path, 0o600) + except: + pass + else: + # Unix/Linux/Mac系统 + os.chmod(file_path, 0o600) + + +class SSHKeyService: + """SSH密钥服务""" + + @staticmethod + def get_public_key_fingerprint(public_key: str) -> Optional[str]: + """ + 计算SSH公钥的SHA256指纹 + + Args: + public_key: SSH公钥(OpenSSH格式) + + Returns: + SHA256指纹字符串,格式如: SHA256:Js1ypfoB+N2IfrCGgSj81vHnK4F/XxUV6Y9KUwKoFx8 + """ + try: + # 解析公钥 (格式: ssh-ed25519 AAAAC3Nza...) + parts = public_key.strip().split() + if len(parts) < 2: + return None + + # 获取base64编码的公钥数据 + key_data = parts[1] + + # 解码base64 + key_bytes = base64.b64decode(key_data) + + # 计算SHA256哈希 + sha256_hash = hashlib.sha256(key_bytes).digest() + + # 转换为base64(无填充) + fingerprint = base64.b64encode(sha256_hash).decode('utf-8').rstrip('=') + + return f"SHA256:{fingerprint}" + + except Exception as e: + print(f"[SSH] Fingerprint calculation error: {e}") + return None + + @staticmethod + def verify_key_pair(private_key: str, public_key: str) -> bool: + """ + 验证私钥和公钥是否匹配 + + Args: + private_key: SSH私钥(支持传统RSA PEM格式或OpenSSH格式) + public_key: SSH公钥(OpenSSH格式) + + Returns: + 是否匹配 + """ + try: + from cryptography.hazmat.primitives.serialization import ( + load_ssh_private_key, + load_pem_private_key + ) + from cryptography.hazmat.backends import default_backend + + # 尝试加载私钥(支持多种格式) + private_key_bytes = private_key.encode('utf-8') + private_key_obj = None + + # 首先尝试作为OpenSSH格式加载 + try: + private_key_obj = load_ssh_private_key( + private_key_bytes, + password=None, + backend=default_backend() + ) + except Exception: + # 如果失败,尝试作为传统PEM格式加载(支持RSA、DSA、EC等) + try: + private_key_obj = load_pem_private_key( + private_key_bytes, + password=None, + backend=default_backend() + ) + except Exception as e: + print(f"[SSH] Failed to load private key: {e}") + return False + + if not private_key_obj: + return False + + # 从私钥导出公钥 + derived_public_key = private_key_obj.public_key() + derived_public_bytes = derived_public_key.public_bytes( + encoding=serialization.Encoding.OpenSSH, + format=serialization.PublicFormat.OpenSSH + ).decode('utf-8').strip() + + # 比较(去除可能的注释部分) + expected_public = public_key.split()[0] + ' ' + public_key.split()[1] + actual_public = derived_public_bytes.split()[0] + ' ' + derived_public_bytes.split()[1] + + return expected_public == actual_public + + except Exception as e: + print(f"[SSH] Key verification error: {e}") + return False + + @staticmethod + def generate_rsa_key(key_size: int = 4096) -> Tuple[str, str]: + """ + 生成RSA SSH密钥对 + + Args: + key_size: RSA密钥大小(比特),默认4096 + + Returns: + (private_key, public_key): 私钥和公钥的元组,私钥使用传统PEM格式 + """ + # 生成RSA私钥 + private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=key_size, + backend=default_backend() + ) + + # 序列化私钥为传统PEM格式(BEGIN RSA PRIVATE KEY,兼容性更好) + private_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption() + ).decode('utf-8') + + # 获取公钥并序列化为OpenSSH格式 + public_key = private_key.public_key() + public_openssh = public_key.public_bytes( + encoding=serialization.Encoding.OpenSSH, + format=serialization.PublicFormat.OpenSSH + ).decode('utf-8') + + return private_pem, public_openssh + + @staticmethod + def generate_ed25519_key() -> Tuple[str, str]: + """ + 生成ED25519 SSH密钥对(备用方法,默认使用RSA) + + Returns: + (private_key, public_key): 私钥和公钥的元组,都是OpenSSH格式 + """ + # 生成ED25519私钥 + private_key = ed25519.Ed25519PrivateKey.generate() + + # 序列化私钥为OpenSSH格式 + private_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.OpenSSH, + encryption_algorithm=serialization.NoEncryption() + ).decode('utf-8') + + # 获取公钥并序列化为OpenSSH格式 + public_key = private_key.public_key() + public_openssh = public_key.public_bytes( + encoding=serialization.Encoding.OpenSSH, + format=serialization.PublicFormat.OpenSSH + ).decode('utf-8') + + return private_pem, public_openssh + + +class GitSSHOperations: + """Git SSH操作类 - 使用SSH密钥克隆和拉取仓库""" + + @staticmethod + def is_ssh_url(url: str) -> bool: + """ + 判断URL是否为SSH格式 + + Args: + url: Git仓库URL + + Returns: + 是否为SSH URL + """ + return url.startswith('git@') or url.startswith('ssh://') + + @staticmethod + def clone_repo_with_ssh(repo_url: str, private_key: str, target_dir: str, branch: str = None) -> Dict[str, any]: + """ + 使用SSH密钥克隆Git仓库 + + Args: + repo_url: SSH格式的Git URL (例如: git@github.com:user/repo.git) + private_key: SSH私钥内容 + target_dir: 目标目录 + branch: 分支名称 + + Returns: + 操作结果字典 + """ + temp_dir = None + try: + # 创建临时目录存放SSH密钥 + temp_dir = tempfile.mkdtemp(prefix='deepaudit_ssh_') + key_file = os.path.join(temp_dir, 'id_rsa') + + # 写入私钥 + with open(key_file, 'w') as f: + f.write(private_key) + set_secure_file_permissions(key_file) + + # 使用持久化的known_hosts文件 + known_hosts_file = get_known_hosts_file() + + # 设置Git SSH命令,只使用DeepAudit生成的SSH密钥 + env = os.environ.copy() + + # 构建SSH命令(只使用DeepAudit密钥) + ssh_cmd_parts = [ + 'ssh', + '-i', key_file, + '-o', 'StrictHostKeyChecking=accept-new', # 首次连接时自动接受并保存host key + '-o', f'UserKnownHostsFile={known_hosts_file}', # 使用持久化known_hosts文件 + '-o', 'PreferredAuthentications=publickey', + '-o', 'IdentitiesOnly=yes' # 只使用指定的密钥,不使用系统默认密钥 + ] + + env['GIT_SSH_COMMAND'] = ' '.join(ssh_cmd_parts) + print(f"[Git Clone] Using DeepAudit SSH key: {key_file}") + print(f"[Git Clone] Using known_hosts file: {known_hosts_file}") + + # 执行git clone + cmd = ['git', 'clone', '--depth', '1'] + if branch: # 只有明确指定分支时才添加 + cmd.extend(['--branch', branch]) + cmd.extend([repo_url, target_dir]) + + result = subprocess.run( + cmd, + env=env, + capture_output=True, + text=True, + timeout=300 + ) + + if result.returncode == 0: + return { + 'success': True, + 'message': '仓库克隆成功', + 'path': target_dir + } + else: + return { + 'success': False, + 'message': '仓库克隆失败', + 'error': result.stderr + } + + except subprocess.TimeoutExpired: + return {'success': False, 'message': '克隆超时(超过5分钟)'} + except Exception as e: + return {'success': False, 'message': f'克隆失败: {str(e)}'} + finally: + # 清理临时文件 + if temp_dir and os.path.exists(temp_dir): + shutil.rmtree(temp_dir, ignore_errors=True) + + @staticmethod + def get_repo_files_via_ssh(repo_url: str, private_key: str, branch: str = "main", + exclude_patterns: List[str] = None) -> List[Dict[str, str]]: + """ + 通过SSH克隆仓库并获取文件列表 + + Args: + repo_url: SSH格式的Git URL + private_key: SSH私钥 + branch: 分支名称 + exclude_patterns: 排除模式列表 + + Returns: + 文件列表,每个文件包含path和内容 + """ + temp_clone_dir = None + try: + # 创建临时克隆目录 + temp_clone_dir = tempfile.mkdtemp(prefix='deepaudit_clone_') + + # 克隆仓库 + clone_result = GitSSHOperations.clone_repo_with_ssh( + repo_url, private_key, temp_clone_dir, branch + ) + + if not clone_result['success']: + raise Exception(f"克隆仓库失败: {clone_result.get('error', '')}") + + # 扫描目录获取文件列表 + from app.services.scanner import is_text_file, should_exclude + + files = [] + for root, dirs, filenames in os.walk(temp_clone_dir): + # 排除.git目录 + if '.git' in dirs: + dirs.remove('.git') + + for filename in filenames: + file_path = os.path.join(root, filename) + # 获取相对路径 + rel_path = os.path.relpath(file_path, temp_clone_dir) + + # 检查是否应该排除 + if should_exclude(rel_path, exclude_patterns): + continue + + # 只处理文本文件 + if not is_text_file(rel_path): + continue + + try: + # 读取文件内容 + with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: + content = f.read() + + files.append({ + 'path': rel_path.replace('\\', '/'), # 统一使用/作为路径分隔符 + 'content': content + }) + except Exception as e: + print(f"读取文件 {rel_path} 失败: {e}") + continue + + return files + + except Exception as e: + print(f"获取SSH仓库文件失败: {e}") + raise + finally: + # 清理临时克隆目录 + if temp_clone_dir and os.path.exists(temp_clone_dir): + shutil.rmtree(temp_clone_dir, ignore_errors=True) + + @staticmethod + def test_ssh_key(repo_url: str, private_key: str) -> Dict[str, any]: + """ + 测试SSH密钥是否有效 + + Args: + repo_url: SSH格式的Git URL + private_key: SSH私钥 + + Returns: + 测试结果字典 + """ + temp_dir = None + try: + # 从URL提取主机 + if '@' in repo_url: + host_part = repo_url.split('@')[1].split(':')[0] + else: + return {'success': False, 'message': 'URL格式无效'} + + # 创建临时目录存放密钥 + temp_dir = tempfile.mkdtemp(prefix='deepaudit_ssh_test_') + key_file = os.path.join(temp_dir, 'id_rsa') + + # 写入私钥 + with open(key_file, 'w') as f: + f.write(private_key) + + # 验证文件是否被创建 + if not os.path.exists(key_file): + return {'success': False, 'message': f'私钥文件创建失败: {key_file}'} + + file_size = os.path.getsize(key_file) + + set_secure_file_permissions(key_file) + + # 使用持久化的known_hosts文件 + known_hosts_file = get_known_hosts_file() + + # 构建SSH命令(只使用DeepAudit密钥) + cmd = [ + 'ssh', + '-i', key_file, + '-o', 'StrictHostKeyChecking=accept-new', # 首次连接时自动接受并保存host key + '-o', f'UserKnownHostsFile={known_hosts_file}', # 使用持久化known_hosts文件 + '-o', 'ConnectTimeout=10', + '-o', 'PreferredAuthentications=publickey', + '-o', 'IdentitiesOnly=yes', # 只使用指定的密钥,不使用系统默认密钥 + '-v', # 详细输出 + '-T', f'git@{host_part}' + ] + + print(f"[SSH Test] Using known_hosts file: {known_hosts_file}") + + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=15 + ) + + # GitHub/GitLab/CodeUp的SSH测试通常返回非0状态码,但会在输出中显示认证成功 + output = result.stdout + result.stderr + output_lower = output.lower() + + + # 特别检查Anonymous(表示公钥未添加或未关联用户账户) + # 必须在检查成功之前检查,因为Anonymous表示认证技术上成功但没有关联用户 + if 'anonymous' in output_lower: + return { + 'success': True, + 'message': 'SSH连接成功,但公钥未关联用户账户', + 'output': f'提示:服务器显示Anonymous,在使用部署密钥时是正常现象。\n请在Git服务的设置中添加SSH公钥。\n\n原始输出:\n{output}' + } + + # 检查是否认证成功 + success_indicators = [ + ('successfully authenticated', True), # GitHub + ('hi ', True), # GitHub: "Hi username!" + ('welcome to gitlab', '@' in output), # GitLab需要有@username + ('welcome to codeup', '@' in output), # CodeUp需要有@username + ] + + is_success = False + for indicator, extra_check in success_indicators: + if indicator in output_lower: + if extra_check is True or extra_check: + is_success = True + break + + if is_success: + return { + 'success': True, + 'message': 'SSH密钥验证成功', + 'output': output + } + else: + # 提供更详细的错误信息 + error_msg = 'SSH密钥验证失败' + if 'permission denied' in output_lower: + error_msg = 'SSH密钥验证失败:权限被拒绝,请确认公钥已添加到Git服务' + elif 'connection refused' in output_lower: + error_msg = 'SSH连接被拒绝,请检查网络连接' + elif 'no route to host' in output_lower: + error_msg = 'SSH连接失败:无法连接到主机' + elif not output.strip(): + error_msg = 'SSH连接失败:未收到任何响应' + + return { + 'success': False, + 'message': error_msg, + 'output': output if output.strip() else '未收到任何响应' + } + + except subprocess.TimeoutExpired: + return { + 'success': False, + 'message': 'SSH连接超时(15秒)', + 'output': '连接超时,请检查网络或Git服务可用性' + } + except Exception as e: + return { + 'success': False, + 'message': f'测试失败: {str(e)}', + 'output': str(e) + } + finally: + if temp_dir and os.path.exists(temp_dir): + shutil.rmtree(temp_dir, ignore_errors=True) diff --git a/backend/app/services/scanner.py b/backend/app/services/scanner.py index 7a52d72..4bf2892 100644 --- a/backend/app/services/scanner.py +++ b/backend/app/services/scanner.py @@ -356,51 +356,83 @@ async def scan_repo_task(task_id: str, db_session_factory, user_config: dict = N + # 获取SSH私钥(如果配置了) + ssh_private_key = None + if 'sshPrivateKey' in user_other_config: + from app.core.encryption import decrypt_sensitive_data + ssh_private_key = decrypt_sensitive_data(user_other_config['sshPrivateKey']) + files: List[Dict[str, str]] = [] extracted_gitlab_token = None - last_error = None - actual_branch = branch - - # 构造尝试的分支列表 - branches_to_try = [branch] - if branch not in ["main", "master"]: - branches_to_try.extend(["main", "master"]) - branches_to_try = list(dict.fromkeys(branches_to_try)) - for try_branch in branches_to_try: + # 检查是否为SSH URL + from app.services.git_ssh_service import GitSSHOperations + is_ssh_url = GitSSHOperations.is_ssh_url(repo_url) + + if is_ssh_url: + # 使用SSH方式获取文件 + if not ssh_private_key: + raise Exception("仓库使用SSH URL,但未配置SSH密钥。请先生成并配置SSH密钥。") + + print(f"🔐 使用SSH方式访问仓库: {repo_url}") try: - if repo_type == "github": - files = await get_github_files(repo_url, try_branch, github_token, task_exclude_patterns) - elif repo_type == "gitlab": - files = await get_gitlab_files(repo_url, try_branch, gitlab_token, task_exclude_patterns) - # GitLab文件可能带有token - if files and 'token' in files[0]: - extracted_gitlab_token = files[0].get('token') - elif repo_type == "gitea": - files = await get_gitea_files(repo_url, try_branch, gitea_token, task_exclude_patterns) - else: - raise Exception("不支持的仓库类型,仅支持 GitHub, GitLab 和 Gitea 仓库") - - if files: - actual_branch = try_branch - if try_branch != branch: - print(f"⚠️ 分支 {branch} 不存在或无法访问,已降级到分支 {try_branch}") - break + files_with_content = GitSSHOperations.get_repo_files_via_ssh( + repo_url, ssh_private_key, branch, task_exclude_patterns + ) + # 转换为统一格式 + files = [{'path': f['path'], 'content': f['content']} for f in files_with_content] + actual_branch = branch + print(f"✅ 通过SSH成功获取 {len(files)} 个文件") except Exception as e: - last_error = str(e) - print(f"⚠️ 获取分支 {try_branch} 失败: {last_error[:100]}") - continue + raise Exception(f"SSH方式获取仓库文件失败: {str(e)}") + else: + # 使用API方式获取文件(原有逻辑) + # 构建分支尝试顺序(分支降级机制) + branches_to_try = [branch] + if project.default_branch and project.default_branch != branch: + branches_to_try.append(project.default_branch) + for common_branch in ["main", "master"]: + if common_branch not in branches_to_try: + branches_to_try.append(common_branch) - if not files: - error_msg = f"无法获取仓库文件,所有分支尝试均失败" - if last_error: - if "404" in last_error or "Not Found" in last_error: - error_msg = f"仓库或分支不存在: {branch}" - elif "401" in last_error or "403" in last_error: - error_msg = "无访问权限,请检查 Token 配置" - else: - error_msg = f"获取文件失败: {last_error[:100]}" - raise Exception(error_msg) + actual_branch = branch # 实际使用的分支 + last_error = None + + for try_branch in branches_to_try: + try: + print(f"🔄 尝试获取分支 {try_branch} 的文件列表...") + if repo_type == "github": + files = await get_github_files(repo_url, try_branch, github_token, task_exclude_patterns) + elif repo_type == "gitlab": + files = await get_gitlab_files(repo_url, try_branch, gitlab_token, task_exclude_patterns) + # GitLab文件可能带有token + if files and 'token' in files[0]: + extracted_gitlab_token = files[0].get('token') + elif repo_type == "gitea": + files = await get_gitea_files(repo_url, try_branch, gitea_token, task_exclude_patterns) + else: + raise Exception("不支持的仓库类型,仅支持 GitHub, GitLab 和 Gitea 仓库") + + if files: + actual_branch = try_branch + if try_branch != branch: + print(f"⚠️ 分支 {branch} 不存在或无法访问,已降级到分支 {try_branch}") + break + except Exception as e: + last_error = str(e) + print(f"⚠️ 获取分支 {try_branch} 失败: {last_error[:100]}") + continue + + if not files: + error_msg = f"无法获取仓库文件,所有分支尝试均失败" + if last_error: + if "404" in last_error or "Not Found" in last_error: + error_msg = f"仓库或分支不存在: {branch}" + elif "401" in last_error or "403" in last_error: + error_msg = "无访问权限,请检查 Token 配置" + else: + error_msg = f"获取文件失败: {last_error[:100]}" + raise Exception(error_msg) print(f"✅ 成功获取分支 {actual_branch} 的文件列表") @@ -450,25 +482,32 @@ async def scan_repo_task(task_id: str, db_session_factory, user_config: dict = N try: # 获取文件内容 - headers = {} - # 使用提取的 token 或用户配置的 token - - if repo_type == "gitlab": - token_to_use = file_info.get('token') or gitlab_token - if token_to_use: - headers["PRIVATE-TOKEN"] = token_to_use - elif repo_type == "gitea": - token_to_use = file_info.get('token') or gitea_token - if token_to_use: - headers["Authorization"] = f"token {token_to_use}" - elif repo_type == "github": - # GitHub raw URL 也是直接下载,通常public不需要token,private需要 - # GitHub raw user content url: raw.githubusercontent.com - if github_token: - headers["Authorization"] = f"Bearer {github_token}" - - print(f"📥 正在获取文件: {file_info['path']}") - content = await fetch_file_content(file_info["url"], headers) + + if is_ssh_url: + # SSH方式已经包含了文件内容 + content = file_info.get('content', '') + print(f"📥 正在处理SSH文件: {file_info['path']}") + else: + headers = {} + # 使用提取的 token 或用户配置的 token + + if repo_type == "gitlab": + token_to_use = file_info.get('token') or gitlab_token + if token_to_use: + headers["PRIVATE-TOKEN"] = token_to_use + elif repo_type == "gitea": + token_to_use = file_info.get('token') or gitea_token + if token_to_use: + headers["Authorization"] = f"token {token_to_use}" + elif repo_type == "github": + # GitHub raw URL 也是直接下载,通常public不需要token,private需要 + # GitHub raw user content url: raw.githubusercontent.com + if github_token: + headers["Authorization"] = f"Bearer {github_token}" + + print(f"📥 正在获取文件: {file_info['path']}") + content = await fetch_file_content(file_info["url"], headers) + if not content or not content.strip(): print(f"⚠️ 文件内容为空,跳过: {file_info['path']}") skipped_files += 1 diff --git a/frontend/src/pages/Account.tsx b/frontend/src/pages/Account.tsx index 9e2d14c..9cfe24d 100644 --- a/frontend/src/pages/Account.tsx +++ b/frontend/src/pages/Account.tsx @@ -10,6 +10,7 @@ import { Input } from "@/components/ui/input"; import { Label } from "@/components/ui/label"; import { Avatar, AvatarFallback, AvatarImage } from "@/components/ui/avatar"; import { AlertDialog, AlertDialogAction, AlertDialogCancel, AlertDialogContent, AlertDialogDescription, AlertDialogFooter, AlertDialogHeader, AlertDialogTitle } from "@/components/ui/alert-dialog"; +import { Textarea } from "@/components/ui/textarea"; import { User, Mail, @@ -21,11 +22,17 @@ import { LogOut, UserPlus, GitBranch, - Terminal + Terminal, + Key, + Copy, + Trash2, + CheckCircle2, + ServerCrash } from "lucide-react"; import { apiClient } from "@/shared/api/serverClient"; import { toast } from "sonner"; import type { Profile } from "@/shared/types"; +import { generateSSHKey, getSSHKey, deleteSSHKey, testSSHKey, clearKnownHosts } from "@/shared/api/sshKeys"; export default function Account() { const navigate = useNavigate(); @@ -46,8 +53,18 @@ export default function Account() { }); const [changingPassword, setChangingPassword] = useState(false); + // SSH Key states + const [sshKey, setSSHKey] = useState<{ has_key: boolean; public_key?: string; fingerprint?: string }>({ has_key: false }); + const [generatingKey, setGeneratingKey] = useState(false); + const [deletingKey, setDeletingKey] = useState(false); + const [clearingKnownHosts, setClearingKnownHosts] = useState(false); + const [testingKey, setTestingKey] = useState(false); + const [testRepoUrl, setTestRepoUrl] = useState(""); + const [showDeleteKeyDialog, setShowDeleteKeyDialog] = useState(false); + useEffect(() => { loadProfile(); + loadSSHKey(); }, []); const loadProfile = async () => { @@ -69,6 +86,101 @@ export default function Account() { } }; + const loadSSHKey = async () => { + try { + const data = await getSSHKey(); + setSSHKey(data); + } catch (error) { + console.error('Failed to load SSH key:', error); + } + }; + + const handleGenerateSSHKey = async () => { + try { + setGeneratingKey(true); + const data = await generateSSHKey(); + setSSHKey({ has_key: true, public_key: data.public_key, fingerprint: data.fingerprint }); + toast.success(data.message); + } catch (error: any) { + console.error('Failed to generate SSH key:', error); + toast.error(error.response?.data?.detail || "生成SSH密钥失败"); + } finally { + setGeneratingKey(false); + } + }; + + const handleDeleteSSHKey = async () => { + try { + setDeletingKey(true); + await deleteSSHKey(); + setSSHKey({ has_key: false }); + toast.success("SSH密钥已删除"); + setShowDeleteKeyDialog(false); + } catch (error: any) { + console.error('Failed to delete SSH key:', error); + toast.error(error.response?.data?.detail || "删除SSH密钥失败"); + } finally { + setDeletingKey(false); + } + }; + + const handleTestSSHKey = async () => { + if (!testRepoUrl) { + toast.error("请输入仓库URL"); + return; + } + try { + setTestingKey(true); + const result = await testSSHKey(testRepoUrl); + if (result.success) { + toast.success("SSH连接测试成功"); + // 在控制台输出详细信息 + if (result.output) { + console.log("SSH测试输出:", result.output); + } + } else { + // 显示详细的错误信息 + toast.error(result.message || "SSH连接测试失败", { + description: result.output ? `详情: ${result.output.substring(0, 100)}...` : undefined, + duration: 5000, + }); + // 在控制台输出完整错误信息 + if (result.output) { + console.error("SSH测试失败:", result.output); + } + } + } catch (error: any) { + console.error('Failed to test SSH key:', error); + toast.error(error.response?.data?.detail || "测试SSH密钥失败"); + } finally { + setTestingKey(false); + } + }; + + const handleClearKnownHosts = async () => { + try { + setClearingKnownHosts(true); + const result = await clearKnownHosts(); + if (result.success) { + toast.success(result.message || "known_hosts已清理"); + } else { + toast.error("清理known_hosts失败"); + } + } catch (error: any) { + console.error('Failed to clear known_hosts:', error); + toast.error(error.response?.data?.detail || "清理known_hosts失败"); + } finally { + setClearingKnownHosts(false); + } + }; + + const handleCopyPublicKey = () => { + if (sshKey.public_key) { + navigator.clipboard.writeText(sshKey.public_key); + toast.success("公钥已复制到剪贴板"); + } + }; + const handleSave = async () => { try { setSaving(true); @@ -309,6 +421,161 @@ export default function Account() { + {/* SSH Key Management */} +
+
+ +

SSH 密钥管理

+
+
+
+
+
+ +
+
+
+

+ 使用 SSH 密钥访问 Git 仓库 +

+

+ 生成 SSH 密钥对后,将公钥添加到 GitHub/GitLab,即可使用 SSH URL 访问私有仓库。私钥将被加密存储。 +

+
+
+ + {!sshKey.has_key ? ( +
+
+ +
+

尚未生成 SSH 密钥

+ +
+ ) : ( +
+ {/* Public Key Display */} +
+
+ + +
+