feat(agent_tasks): 优化仓库获取逻辑,优先使用ZIP下载

重构仓库项目获取逻辑,优先尝试通过ZIP下载方式获取仓库内容,仅在失败时回退到git clone。ZIP下载方式更快更稳定,减少对git的依赖。同时改进分支尝试顺序和错误处理机制。
This commit is contained in:
lintsinghua 2025-12-16 17:38:52 +08:00
parent 5974323a71
commit 18a91f25b2
1 changed files with 244 additions and 136 deletions

View File

@ -2129,13 +2129,146 @@ async def _get_project_root(
raise RuntimeError(f"项目 ZIP 文件不存在: {project.id}")
elif project.source_type == "repository" and project.repository_url:
# 🔥 仓库项目:克隆仓库
# 🔥 仓库项目:优先使用 ZIP 下载更快更稳定git clone 作为回退
repo_url = project.repository_url
repo_type = project.repository_type or "other"
await emit(f"🔄 正在克隆仓库: {repo_url}")
await emit(f"🔄 正在获取仓库: {repo_url}")
# 检查 git 是否可用(使用 git --version 更可靠)
# 解析仓库 URL 获取 owner/repo
parsed = urlparse(repo_url)
path_parts = parsed.path.strip('/').replace('.git', '').split('/')
if len(path_parts) >= 2:
owner, repo = path_parts[0], path_parts[1]
else:
owner, repo = None, None
# 构建分支尝试顺序
branches_to_try = []
if branch_name:
branches_to_try.append(branch_name)
if project.default_branch and project.default_branch not in branches_to_try:
branches_to_try.append(project.default_branch)
for common_branch in ["main", "master"]:
if common_branch not in branches_to_try:
branches_to_try.append(common_branch)
download_success = False
last_error = ""
# ============ 方案1: 优先使用 ZIP 下载(更快更稳定)============
if owner and repo:
import httpx
for branch in branches_to_try:
check_cancelled()
# 清理目录
if os.path.exists(base_path) and os.listdir(base_path):
shutil.rmtree(base_path)
os.makedirs(base_path, exist_ok=True)
# 构建 ZIP 下载 URL
if repo_type == "github" or "github.com" in repo_url:
# GitHub ZIP 下载 URL
zip_url = f"https://github.com/{owner}/{repo}/archive/refs/heads/{branch}.zip"
headers = {}
if github_token:
headers["Authorization"] = f"token {github_token}"
elif repo_type == "gitlab" or "gitlab" in repo_url:
# GitLab ZIP 下载 URL需要对 owner/repo 进行 URL 编码)
import urllib.parse
project_path = urllib.parse.quote(f"{owner}/{repo}", safe='')
gitlab_host = parsed.netloc
zip_url = f"https://{gitlab_host}/api/v4/projects/{project_path}/repository/archive.zip?sha={branch}"
headers = {}
if gitlab_token:
headers["PRIVATE-TOKEN"] = gitlab_token
else:
# 其他平台,跳过 ZIP 下载
break
logger.info(f"📦 尝试下载 ZIP 归档 (分支: {branch})...")
await emit(f"📦 尝试下载 ZIP 归档 (分支: {branch})")
try:
zip_temp_path = f"/tmp/repo_{task_id}_{branch}.zip"
async def download_zip():
async with httpx.AsyncClient(timeout=60.0, follow_redirects=True) as client:
resp = await client.get(zip_url, headers=headers)
if resp.status_code == 200:
with open(zip_temp_path, 'wb') as f:
f.write(resp.content)
return True, None
else:
return False, f"HTTP {resp.status_code}"
# 使用取消检查循环
download_task = asyncio.create_task(download_zip())
while not download_task.done():
check_cancelled()
try:
success, error = await asyncio.wait_for(asyncio.shield(download_task), timeout=1.0)
break
except asyncio.TimeoutError:
continue
if download_task.done():
success, error = download_task.result()
if success and os.path.exists(zip_temp_path):
# 解压 ZIP
check_cancelled()
with zipfile.ZipFile(zip_temp_path, 'r') as zip_ref:
# ZIP 内通常有一个根目录如 repo-branch/
file_list = zip_ref.namelist()
# 找到公共前缀
if file_list:
common_prefix = file_list[0].split('/')[0] + '/'
for i, file_name in enumerate(file_list):
if i % 50 == 0:
check_cancelled()
# 去掉公共前缀
if file_name.startswith(common_prefix):
target_path = file_name[len(common_prefix):]
if target_path: # 跳过空路径(根目录本身)
full_target = os.path.join(base_path, target_path)
if file_name.endswith('/'):
os.makedirs(full_target, exist_ok=True)
else:
os.makedirs(os.path.dirname(full_target), exist_ok=True)
with zip_ref.open(file_name) as src, open(full_target, 'wb') as dst:
dst.write(src.read())
# 清理临时文件
os.remove(zip_temp_path)
logger.info(f"✅ ZIP 下载成功 (分支: {branch})")
await emit(f"✅ 仓库获取成功 (ZIP下载, 分支: {branch})")
download_success = True
break
else:
last_error = error or "下载失败"
logger.warning(f"ZIP 下载失败 (分支 {branch}): {last_error}")
await emit(f"⚠️ ZIP 下载失败,尝试其他分支...", "warning")
# 清理临时文件
if os.path.exists(zip_temp_path):
os.remove(zip_temp_path)
except asyncio.CancelledError:
logger.info(f"[Cancel] ZIP download cancelled for task {task_id}")
raise
except Exception as e:
last_error = str(e)
logger.warning(f"ZIP 下载异常 (分支 {branch}): {e}")
await emit(f"⚠️ ZIP 下载异常: {str(e)[:50]}...", "warning")
# ============ 方案2: 回退到 git clone ============
if not download_success:
await emit(f"🔄 ZIP 下载失败,回退到 Git 克隆...")
logger.info("ZIP download failed, falling back to git clone")
# 检查 git 是否可用
try:
git_check = subprocess.run(
["git", "--version"],
@ -2145,19 +2278,17 @@ async def _get_project_root(
)
if git_check.returncode != 0:
await emit(f"❌ Git 未安装", "error")
raise RuntimeError("Git 未安装,无法克隆仓库。请在 Docker 容器中安装 git。")
logger.debug(f"Git version: {git_check.stdout.strip()}")
raise RuntimeError("Git 未安装,无法克隆仓库。")
except FileNotFoundError:
await emit(f"❌ Git 未安装", "error")
raise RuntimeError("Git 未安装,无法克隆仓库。请在 Docker 容器中安装 git")
raise RuntimeError("Git 未安装,无法克隆仓库")
except subprocess.TimeoutExpired:
await emit(f"❌ Git 检测超时", "error")
raise RuntimeError("Git 检测超时")
# 构建带认证的 URL(用于私有仓库)
# 构建带认证的 URL
auth_url = repo_url
if repo_type == "github" and github_token:
parsed = urlparse(repo_url)
auth_url = urlunparse((
parsed.scheme,
f"{github_token}@{parsed.netloc}",
@ -2166,10 +2297,8 @@ async def _get_project_root(
parsed.query,
parsed.fragment
))
logger.info(f"🔐 Using GitHub token for authentication")
await emit(f"🔐 使用 GitHub Token 认证")
elif repo_type == "gitlab" and gitlab_token:
parsed = urlparse(repo_url)
auth_url = urlunparse((
parsed.scheme,
f"oauth2:{gitlab_token}@{parsed.netloc}",
@ -2178,36 +2307,18 @@ async def _get_project_root(
parsed.query,
parsed.fragment
))
logger.info(f"🔐 Using GitLab token for authentication")
await emit(f"🔐 使用 GitLab Token 认证")
# 构建分支尝试顺序
branches_to_try = []
if branch_name:
branches_to_try.append(branch_name)
if project.default_branch and project.default_branch not in branches_to_try:
branches_to_try.append(project.default_branch)
# 添加常见默认分支
for common_branch in ["main", "master"]:
if common_branch not in branches_to_try:
branches_to_try.append(common_branch)
clone_success = False
last_error = ""
for branch in branches_to_try:
# 🔥 每次尝试前检查取消
check_cancelled()
# 清理目录(如果之前尝试失败)
if os.path.exists(base_path) and os.listdir(base_path):
shutil.rmtree(base_path)
os.makedirs(base_path, exist_ok=True)
logger.info(f"🔄 Trying to clone repository (branch: {branch})...")
logger.info(f"🔄 尝试克隆分支: {branch}")
await emit(f"🔄 尝试克隆分支: {branch}")
# 🔥 使用 asyncio 包装 subprocess支持取消
try:
async def run_clone():
return await asyncio.to_thread(
@ -2218,7 +2329,6 @@ async def _get_project_root(
timeout=120,
)
# 🔥 使用 wait_for 添加取消检查循环
clone_task = asyncio.create_task(run_clone())
while not clone_task.done():
check_cancelled()
@ -2232,27 +2342,27 @@ async def _get_project_root(
result = clone_task.result()
if result.returncode == 0:
logger.info(f"Cloned repository {repo_url} (branch: {branch}) to {base_path}")
await emit(f"✅ 仓库克隆成功 (分支: {branch})")
clone_success = True
logger.info(f"Git 克隆成功 (分支: {branch})")
await emit(f"✅ 仓库获取成功 (Git克隆, 分支: {branch})")
download_success = True
break
else:
last_error = result.stderr
logger.warning(f"Failed to clone branch {branch}: {last_error[:200]}")
await emit(f"⚠️ 分支 {branch} 克隆失败,尝试其他分支...", "warning")
logger.warning(f"克隆失败 (分支 {branch}): {last_error[:200]}")
await emit(f"⚠️ 分支 {branch} 克隆失败...", "warning")
except subprocess.TimeoutExpired:
last_error = f"克隆分支 {branch} 超时"
logger.warning(last_error)
await emit(f"⚠️ 分支 {branch} 克隆超时,尝试其他分支...", "warning")
await emit(f"⚠️ 分支 {branch} 克隆超时...", "warning")
except asyncio.CancelledError:
logger.info(f"[Cancel] Git clone cancelled for task {task_id}")
raise
# 如果所有分支都失败,尝试不指定分支克隆(使用仓库默认分支
if not clone_success:
check_cancelled() # 🔥 检查取消
logger.info(f"🔄 Trying to clone without specifying branch...")
await emit(f"🔄 尝试使用仓库默认分支克隆...")
# 尝试默认分支
if not download_success:
check_cancelled()
await emit(f"🔄 尝试使用仓库默认分支...")
if os.path.exists(base_path) and os.listdir(base_path):
shutil.rmtree(base_path)
os.makedirs(base_path, exist_ok=True)
@ -2267,7 +2377,6 @@ async def _get_project_root(
timeout=120,
)
# 🔥 使用 wait_for 添加取消检查循环
clone_task = asyncio.create_task(run_default_clone())
while not clone_task.done():
check_cancelled()
@ -2281,19 +2390,18 @@ async def _get_project_root(
result = clone_task.result()
if result.returncode == 0:
logger.info(f"Cloned repository {repo_url} (default branch) to {base_path}")
await emit(f"✅ 仓库克隆成功 (默认分支)")
clone_success = True
logger.info(f"Git 克隆成功 (默认分支)")
await emit(f"✅ 仓库获取成功 (Git克隆, 默认分支)")
download_success = True
else:
last_error = result.stderr
except subprocess.TimeoutExpired:
last_error = "克隆仓库超时"
await emit(f"⚠️ 克隆超时", "warning")
last_error = "克隆超时"
except asyncio.CancelledError:
logger.info(f"[Cancel] Git clone cancelled for task {task_id}")
raise
if not clone_success:
if not download_success:
# 分析错误原因
error_msg = "克隆仓库失败"
if "Authentication failed" in last_error or "401" in last_error: