From b030381ad2ef5b1d846d7cbac084f3174c7af60d Mon Sep 17 00:00:00 2001 From: lintsinghua Date: Fri, 26 Dec 2025 20:34:47 +0800 Subject: [PATCH] =?UTF-8?q?feat(ssh):=20=E5=A2=9E=E5=8A=A0SSH=E8=B6=85?= =?UTF-8?q?=E6=97=B6=E9=85=8D=E7=BD=AE=E5=B9=B6=E6=94=B9=E8=BF=9B=E9=94=99?= =?UTF-8?q?=E8=AF=AF=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在配置中添加SSH_CLONE_TIMEOUT、SSH_TEST_TIMEOUT和SSH_CONNECT_TIMEOUT - 替换print为logging记录关键操作和错误 - 改进SSH命令构建方式防止注入 - 添加分支名验证逻辑 - 优化错误消息显示,避免暴露敏感信息 --- backend/app/api/v1/endpoints/ssh_keys.py | 19 ++- backend/app/core/config.py | 3 + backend/app/services/git_ssh_service.py | 152 ++++++++++++++++------- 3 files changed, 125 insertions(+), 49 deletions(-) diff --git a/backend/app/api/v1/endpoints/ssh_keys.py b/backend/app/api/v1/endpoints/ssh_keys.py index 092e2e2..81f067a 100644 --- a/backend/app/api/v1/endpoints/ssh_keys.py +++ b/backend/app/api/v1/endpoints/ssh_keys.py @@ -2,6 +2,7 @@ SSH密钥管理API端点 """ +import logging from typing import Any, Optional from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.ext.asyncio import AsyncSession @@ -17,6 +18,7 @@ from app.services.git_ssh_service import SSHKeyService, GitSSHOperations, clear_ from app.core.encryption import encrypt_sensitive_data, decrypt_sensitive_data router = APIRouter() +logger = logging.getLogger(__name__) # Schemas @@ -93,7 +95,8 @@ async def generate_ssh_key( } except Exception as e: - raise HTTPException(status_code=500, detail=f"生成SSH密钥失败: {str(e)}") + logger.error(f"Failed to generate SSH key for user {current_user.id}: {e}") + raise HTTPException(status_code=500, detail="生成SSH密钥失败,请稍后重试") @router.get("/", response_model=SSHKeyResponse) @@ -130,7 +133,8 @@ async def get_ssh_key( return {"has_key": False} except Exception as e: - raise HTTPException(status_code=500, detail=f"获取SSH密钥失败: {str(e)}") + logger.error(f"Failed to get SSH key for user {current_user.id}: {e}") + raise HTTPException(status_code=500, detail="获取SSH密钥失败,请稍后重试") @router.delete("/") @@ -169,7 +173,8 @@ async def delete_ssh_key( except HTTPException: raise except Exception as e: - raise HTTPException(status_code=500, detail=f"删除SSH密钥失败: {str(e)}") + logger.error(f"Failed to delete SSH key for user {current_user.id}: {e}") + raise HTTPException(status_code=500, detail="删除SSH密钥失败,请稍后重试") @router.post("/test", response_model=SSHKeyTestResponse) @@ -207,7 +212,7 @@ async def test_ssh_key( # 验证密钥对是否匹配 is_valid = SSHKeyService.verify_key_pair(private_key, public_key) - print(f"[SSH Test API] Key pair valid: {is_valid}") + logger.debug(f"Key pair validation result: {is_valid}") if not is_valid: return { @@ -224,7 +229,8 @@ async def test_ssh_key( except HTTPException: raise except Exception as e: - raise HTTPException(status_code=500, detail=f"测试SSH密钥失败: {str(e)}") + logger.error(f"Failed to test SSH key for user {current_user.id}: {e}") + raise HTTPException(status_code=500, detail="测试SSH密钥失败,请稍后重试") @router.delete("/known-hosts") @@ -251,4 +257,5 @@ async def clear_known_hosts_file( except HTTPException: raise except Exception as e: - raise HTTPException(status_code=500, detail=f"清理失败: {str(e)}") + logger.error(f"Failed to clear known_hosts for user {current_user.id}: {e}") + raise HTTPException(status_code=500, detail="清理失败,请稍后重试") diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 47cd6d8..2db4ae8 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -93,6 +93,9 @@ class Settings(BaseSettings): # SSH配置 SSH_CONFIG_PATH: str = "./data/ssh" # SSH配置目录(存储known_hosts等) + SSH_CLONE_TIMEOUT: int = 300 # SSH克隆超时时间(秒) + SSH_TEST_TIMEOUT: int = 15 # SSH测试连接超时时间(秒) + SSH_CONNECT_TIMEOUT: int = 10 # SSH连接超时时间(秒) # Agent 配置 AGENT_MAX_ITERATIONS: int = 50 # Agent 最大迭代次数 diff --git a/backend/app/services/git_ssh_service.py b/backend/app/services/git_ssh_service.py index 4d7f9fa..a5dc846 100644 --- a/backend/app/services/git_ssh_service.py +++ b/backend/app/services/git_ssh_service.py @@ -4,6 +4,9 @@ Git SSH服务 - 生成SSH密钥并使用SSH方式访问Git仓库 import os import sys +import re +import shlex +import logging import tempfile import subprocess import shutil @@ -15,6 +18,55 @@ from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ed25519, rsa from cryptography.hazmat.backends import default_backend +# 配置日志 +logger = logging.getLogger(__name__) + + +def is_valid_branch_name(branch: str) -> bool: + """ + 验证 Git 分支名是否合法 + + Git 分支名规则: + - 不能以 . 或 - 开头 + - 不能包含 .., ~, ^, :, ?, *, [, \\, 空格 + - 不能以 / 结尾 + - 不能以 .lock 结尾 + + Args: + branch: 分支名 + + Returns: + 是否为合法的分支名 + """ + if not branch: + return False + + # 基本格式检查:只允许字母、数字、-、_、/、. + if not re.match(r'^[\w\-/.]+$', branch): + return False + + # 不能以 . 或 - 开头 + if branch.startswith('.') or branch.startswith('-'): + return False + + # 不能以 / 结尾 + if branch.endswith('/'): + return False + + # 不能以 .lock 结尾 + if branch.endswith('.lock'): + return False + + # 不能包含连续的 .. + if '..' in branch: + return False + + # 不能包含连续的 // + if '//' in branch: + return False + + return True + def get_ssh_config_dir() -> str: """ @@ -69,10 +121,10 @@ def clear_known_hosts() -> bool: # 清空文件内容 with open(known_hosts_file, 'w') as f: f.write('') - print(f"[SSH] Cleared known_hosts file: {known_hosts_file}") + logger.info(f"Cleared known_hosts file: {known_hosts_file}") return True except Exception as e: - print(f"[SSH] Failed to clear known_hosts: {e}") + logger.error(f"Failed to clear known_hosts: {e}") return False @@ -99,7 +151,7 @@ def set_secure_file_permissions(file_path: str): check=True ) except Exception as e: - print(f"Warning: Failed to set Windows file permissions: {e}") + logger.warning(f"Failed to set Windows file permissions: {e}") # 尝试使用os.chmod作为后备方案 try: os.chmod(file_path, 0o600) @@ -145,7 +197,7 @@ class SSHKeyService: return f"SHA256:{fingerprint}" except Exception as e: - print(f"[SSH] Fingerprint calculation error: {e}") + logger.error(f"Fingerprint calculation error: {e}") return None @staticmethod @@ -187,7 +239,7 @@ class SSHKeyService: backend=default_backend() ) except Exception as e: - print(f"[SSH] Failed to load private key: {e}") + logger.debug(f"Failed to load private key: {e}") return False if not private_key_obj: @@ -207,7 +259,7 @@ class SSHKeyService: return expected_public == actual_public except Exception as e: - print(f"[SSH] Key verification error: {e}") + logger.error(f"Key verification error: {e}") return False @staticmethod @@ -302,8 +354,15 @@ class GitSSHOperations: Returns: 操作结果字典 """ + from app.core.config import settings + temp_dir = None try: + # 验证分支名(如果提供) + if branch and not is_valid_branch_name(branch): + logger.warning(f"Invalid branch name rejected: {branch}") + return {'success': False, 'message': f'无效的分支名: {branch}'} + # 创建临时目录存放SSH密钥 temp_dir = tempfile.mkdtemp(prefix='deepaudit_ssh_') key_file = os.path.join(temp_dir, 'id_rsa') @@ -319,19 +378,18 @@ class GitSSHOperations: # 设置Git SSH命令,只使用DeepAudit生成的SSH密钥 env = os.environ.copy() - # 构建SSH命令(只使用DeepAudit密钥) - ssh_cmd_parts = [ - 'ssh', - '-i', key_file, - '-o', 'StrictHostKeyChecking=accept-new', # 首次连接时自动接受并保存host key - '-o', f'UserKnownHostsFile={known_hosts_file}', # 使用持久化known_hosts文件 - '-o', 'PreferredAuthentications=publickey', - '-o', 'IdentitiesOnly=yes' # 只使用指定的密钥,不使用系统默认密钥 - ] + # 构建SSH命令(使用 shlex.quote 转义路径防止命令注入) + ssh_cmd = ( + f"ssh -i {shlex.quote(key_file)} " + f"-o StrictHostKeyChecking=accept-new " + f"-o UserKnownHostsFile={shlex.quote(known_hosts_file)} " + f"-o PreferredAuthentications=publickey " + f"-o IdentitiesOnly=yes" + ) - env['GIT_SSH_COMMAND'] = ' '.join(ssh_cmd_parts) - print(f"[Git Clone] Using DeepAudit SSH key: {key_file}") - print(f"[Git Clone] Using known_hosts file: {known_hosts_file}") + env['GIT_SSH_COMMAND'] = ssh_cmd + logger.debug(f"Using SSH key file: {key_file}") + logger.debug(f"Using known_hosts file: {known_hosts_file}") # 执行git clone cmd = ['git', 'clone', '--depth', '1'] @@ -344,7 +402,7 @@ class GitSSHOperations: env=env, capture_output=True, text=True, - timeout=300 + timeout=settings.SSH_CLONE_TIMEOUT ) if result.returncode == 0: @@ -354,6 +412,7 @@ class GitSSHOperations: 'path': target_dir } else: + logger.error(f"Git clone failed: {result.stderr}") return { 'success': False, 'message': '仓库克隆失败', @@ -361,8 +420,10 @@ class GitSSHOperations: } except subprocess.TimeoutExpired: - return {'success': False, 'message': '克隆超时(超过5分钟)'} + logger.error(f"Git clone timeout after {settings.SSH_CLONE_TIMEOUT}s") + return {'success': False, 'message': f'克隆超时(超过{settings.SSH_CLONE_TIMEOUT}秒)'} except Exception as e: + logger.error(f"Git clone error: {e}") return {'success': False, 'message': f'克隆失败: {str(e)}'} finally: # 清理临时文件 @@ -429,13 +490,13 @@ class GitSSHOperations: 'content': content }) except Exception as e: - print(f"读取文件 {rel_path} 失败: {e}") + logger.debug(f"读取文件 {rel_path} 失败: {e}") continue return files except Exception as e: - print(f"获取SSH仓库文件失败: {e}") + logger.error(f"获取SSH仓库文件失败: {e}") raise finally: # 清理临时克隆目录 @@ -454,6 +515,8 @@ class GitSSHOperations: Returns: 测试结果字典 """ + from app.core.config import settings + temp_dir = None try: # 从URL提取主机 @@ -462,6 +525,11 @@ class GitSSHOperations: else: return {'success': False, 'message': 'URL格式无效'} + # 验证主机名格式(防止注入) + if not re.match(r'^[\w\-\.]+$', host_part): + logger.warning(f"Invalid host name rejected: {host_part}") + return {'success': False, 'message': '无效的主机名'} + # 创建临时目录存放密钥 temp_dir = tempfile.mkdtemp(prefix='deepaudit_ssh_test_') key_file = os.path.join(temp_dir, 'id_rsa') @@ -472,57 +540,53 @@ class GitSSHOperations: # 验证文件是否被创建 if not os.path.exists(key_file): - return {'success': False, 'message': f'私钥文件创建失败: {key_file}'} - - file_size = os.path.getsize(key_file) + return {'success': False, 'message': '私钥文件创建失败'} set_secure_file_permissions(key_file) # 使用持久化的known_hosts文件 known_hosts_file = get_known_hosts_file() - # 构建SSH命令(只使用DeepAudit密钥) + # 构建SSH命令(使用列表形式避免shell注入) cmd = [ 'ssh', '-i', key_file, - '-o', 'StrictHostKeyChecking=accept-new', # 首次连接时自动接受并保存host key - '-o', f'UserKnownHostsFile={known_hosts_file}', # 使用持久化known_hosts文件 - '-o', 'ConnectTimeout=10', + '-o', 'StrictHostKeyChecking=accept-new', + '-o', f'UserKnownHostsFile={known_hosts_file}', + '-o', f'ConnectTimeout={settings.SSH_CONNECT_TIMEOUT}', '-o', 'PreferredAuthentications=publickey', - '-o', 'IdentitiesOnly=yes', # 只使用指定的密钥,不使用系统默认密钥 - '-v', # 详细输出 + '-o', 'IdentitiesOnly=yes', + '-v', '-T', f'git@{host_part}' ] - print(f"[SSH Test] Using known_hosts file: {known_hosts_file}") + logger.debug(f"Testing SSH connection to: {host_part}") result = subprocess.run( cmd, capture_output=True, text=True, - timeout=15 + timeout=settings.SSH_TEST_TIMEOUT ) # GitHub/GitLab/CodeUp的SSH测试通常返回非0状态码,但会在输出中显示认证成功 output = result.stdout + result.stderr output_lower = output.lower() - # 特别检查Anonymous(表示公钥未添加或未关联用户账户) - # 必须在检查成功之前检查,因为Anonymous表示认证技术上成功但没有关联用户 if 'anonymous' in output_lower: return { 'success': True, 'message': 'SSH连接成功,但公钥未关联用户账户', - 'output': f'提示:服务器显示Anonymous,在使用部署密钥时是正常现象。\n请在Git服务的设置中添加SSH公钥。\n\n原始输出:\n{output}' + 'output': '提示:服务器显示Anonymous,在使用部署密钥时是正常现象。\n请在Git服务的设置中添加SSH公钥。' } # 检查是否认证成功 success_indicators = [ - ('successfully authenticated', True), # GitHub - ('hi ', True), # GitHub: "Hi username!" - ('welcome to gitlab', '@' in output), # GitLab需要有@username - ('welcome to codeup', '@' in output), # CodeUp需要有@username + ('successfully authenticated', True), + ('hi ', True), + ('welcome to gitlab', '@' in output), + ('welcome to codeup', '@' in output), ] is_success = False @@ -557,16 +621,18 @@ class GitSSHOperations: } except subprocess.TimeoutExpired: + logger.warning(f"SSH test timeout after {settings.SSH_TEST_TIMEOUT}s") return { 'success': False, - 'message': 'SSH连接超时(15秒)', + 'message': f'SSH连接超时({settings.SSH_TEST_TIMEOUT}秒)', 'output': '连接超时,请检查网络或Git服务可用性' } except Exception as e: + logger.error(f"SSH test error: {e}") return { 'success': False, - 'message': f'测试失败: {str(e)}', - 'output': str(e) + 'message': '测试失败,请稍后重试', + 'output': '' } finally: if temp_dir and os.path.exists(temp_dir):