Update merge.py

Bilingual Documentation: Added comprehensive Chinese-English bilingual comments and function documentation to improve international usability.
This commit is contained in:
zhipeng tang 2025-04-01 15:14:11 +08:00 committed by GitHub
parent 6b2e9448a2
commit 78e988c33b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 191 additions and 246 deletions

View File

@ -11,7 +11,16 @@ import logging
def load_jsonl(file_path):
"""Load data from a JSONL file."""
"""
从JSONL文件加载数据
(Load data from a JSONL file)
Args:
file_path (str): JSONL文件路径 (Path to the JSONL file)
Returns:
list: 包含文件中每行JSON对象的列表 (List containing JSON objects from each line)
"""
data = []
# Special handling for episodes_stats.jsonl
@ -56,13 +65,30 @@ def load_jsonl(file_path):
return data
def save_jsonl(data, file_path):
"""Save data to a JSONL file."""
"""
将数据保存为JSONL格式
(Save data in JSONL format)
Args:
data (list): 要保存的JSON对象列表 (List of JSON objects to save)
file_path (str): 输出文件路径 (Path to the output file)
"""
with open(file_path, 'w') as f:
for item in data:
f.write(json.dumps(item) + '\n')
def merge_stats(stats_list):
"""Merge statistics from multiple episodes."""
"""
合并多个数据集的统计信息确保维度一致性
(Merge statistics from multiple datasets, ensuring dimensional consistency)
Args:
stats_list (list): 包含每个数据集统计信息的字典列表
(List of dictionaries containing statistics for each dataset)
Returns:
dict: 合并后的统计信息 (Merged statistics)
"""
# Initialize merged stats with the structure of the first stats
merged_stats = {}
@ -284,7 +310,16 @@ def merge_stats(stats_list):
return merged_stats
def copy_videos(source_folders, output_folder, episode_mapping):
"""Copy video files from source folders to output folder with updated episode indices."""
"""
从源文件夹复制视频文件到输出文件夹保持正确的索引和结构
(Copy video files from source folders to output folder, maintaining correct indices and structure)
Args:
source_folders (list): 源数据集文件夹路径列表 (List of source dataset folder paths)
output_folder (str): 输出文件夹路径 (Output folder path)
episode_mapping (list): 包含(旧文件夹,旧索引,新索引)元组的列表
(List of tuples containing (old_folder, old_index, new_index))
"""
# Get info.json to determine video structure
info_path = os.path.join(source_folders[0], 'meta', 'info.json')
with open(info_path, 'r') as f:
@ -395,13 +430,24 @@ def copy_videos(source_folders, output_folder, episode_mapping):
print(f"Warning: Video file not found for {video_key}, episode {old_index} in {old_folder}")
def validate_timestamps(source_folders, tolerance_s=1e-4):
"""验证源数据集的时间戳结构,识别潜在问题"""
"""
验证源数据集的时间戳结构识别潜在问题
(Validate timestamp structure of source datasets, identify potential issues)
Args:
source_folders (list): 源数据集文件夹路径列表 (List of source dataset folder paths)
tolerance_s (float): 时间戳不连续性的容差值以秒为单位 (Tolerance for timestamp discontinuities in seconds)
Returns:
tuple: (issues, fps_values) - 问题列表和检测到的FPS值列表
(List of issues and list of detected FPS values)
"""
issues = []
fps_values = []
for folder in source_folders:
try:
# 尝试从 info.json 获取 FPS
# 尝试从 info.json 获取 FPS (Try to get FPS from info.json)
info_path = os.path.join(folder, 'meta', 'info.json')
if os.path.exists(info_path):
with open(info_path, 'r') as f:
@ -409,9 +455,9 @@ def validate_timestamps(source_folders, tolerance_s=1e-4):
if 'fps' in info:
fps = info['fps']
fps_values.append(fps)
print(f"数据集 {folder} FPS={fps}")
print(f"数据集 {folder} FPS={fps} (Dataset {folder} FPS={fps})")
# 检查是否有parquet文件包含时间戳
# 检查是否有parquet文件包含时间戳 (Check if any parquet files contain timestamps)
parquet_path = None
for root, _, files in os.walk(os.path.join(folder, "parquet")):
for file in files:
@ -434,86 +480,68 @@ def validate_timestamps(source_folders, tolerance_s=1e-4):
df = pd.read_parquet(parquet_path)
timestamp_cols = [col for col in df.columns if 'timestamp' in col or 'time' in col]
if timestamp_cols:
print(f"数据集 {folder} 包含时间戳列: {timestamp_cols}")
print(f"数据集 {folder} 包含时间戳列: {timestamp_cols} (Dataset {folder} contains timestamp columns: {timestamp_cols})")
else:
issues.append(f"警告: 数据集 {folder} 没有时间戳列")
issues.append(f"警告: 数据集 {folder} 没有时间戳列 (Warning: Dataset {folder} has no timestamp columns)")
else:
issues.append(f"警告: 数据集 {folder} 未找到parquet文件")
issues.append(f"警告: 数据集 {folder} 未找到parquet文件 (Warning: No parquet files found in dataset {folder})")
except Exception as e:
issues.append(f"错误: 验证数据集 {folder} 失败: {e}")
print(f"验证错误: {e}")
issues.append(f"错误: 验证数据集 {folder} 失败: {e} (Error: Failed to validate dataset {folder}: {e})")
print(f"验证错误: {e} (Validation error: {e})")
traceback.print_exc()
# 检查FPS是否一致
# 检查FPS是否一致 (Check if FPS values are consistent)
if len(set(fps_values)) > 1:
issues.append(f"警告: 数据集FPS不一致: {fps_values}")
issues.append(f"警告: 数据集FPS不一致: {fps_values} (Warning: Inconsistent FPS across datasets: {fps_values})")
return issues, fps_values
def normalize_timestamps_df(df, fps=None, base_time=None, episode_idx=0, dataset_idx=0, default_fps=20):
"""
规范化DataFrame中的时间戳强制创建连续时间序列同时考虑数据集差异
"""
normalized_df = df.copy()
# 找出所有时间戳列
timestamp_cols = [col for col in df.columns if 'timestamp' in col or 'time' in col]
if not timestamp_cols:
return normalized_df # 没有时间戳列,直接返回
# 如果未提供fps使用默认值
if fps is None:
fps = default_fps
# 确定基准时间
if base_time is None:
# 使用当前时间作为基准,避免使用可能有问题的原始时间戳
base_time = datetime.now().timestamp()
# 为每个数据集添加一个较大的时间偏移例如每个数据集间隔1天
dataset_offset = dataset_idx * 86400 # 86400秒 = 1天
# 为每个episode计算唯一的起始时间避免重叠
frame_interval = 1.0 / fps
episode_duration = len(df) * frame_interval
# 添加10帧的缓冲确保episodes之间有间隔
start_time = base_time + dataset_offset + episode_idx * (episode_duration + 10 * frame_interval)
print(f"数据集 {dataset_idx}, Episode {episode_idx}: 创建连续时间戳,起始于 {start_time}, 间隔 {frame_interval}")
# 对每个时间戳列创建严格递增的时间序列
for col in timestamp_cols:
normalized_df[col] = [start_time + i * frame_interval for i in range(len(df))]
return normalized_df
def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18, fps=None,
episode_to_frame_index=None, folder_task_mapping=None, chunks_size=1000, default_fps=20):
"""复制parquet数据文件"""
# 获取第一个数据集的FPS如果未提供
"""
复制并处理parquet数据文件包括维度填充和索引更新
(Copy and process parquet data files, including dimension padding and index updates)
Args:
source_folders (list): 源数据集文件夹路径列表 (List of source dataset folder paths)
output_folder (str): 输出文件夹路径 (Output folder path)
episode_mapping (list): 包含(旧文件夹,旧索引,新索引)元组的列表
(List of tuples containing (old_folder, old_index, new_index))
max_dim (int): 向量的最大维度 (Maximum dimension for vectors)
fps (float, optional): 帧率如果未提供则从第一个数据集获取 (Frame rate, if not provided will be obtained from the first dataset)
episode_to_frame_index (dict, optional): 每个新episode索引对应的起始帧索引映射
(Mapping of each new episode index to its starting frame index)
folder_task_mapping (dict, optional): 每个文件夹中task_index的映射关系
(Mapping of task_index for each folder)
chunks_size (int): 每个chunk包含的episode数量 (Number of episodes per chunk)
default_fps (float): 默认帧率当无法从数据集获取时使用 (Default frame rate when unable to obtain from dataset)
Returns:
bool: 是否成功复制了至少一个文件 (Whether at least one file was successfully copied)
"""
# 获取第一个数据集的FPS如果未提供(Get FPS from first dataset if not provided)
if fps is None:
info_path = os.path.join(source_folders[0], 'meta', 'info.json')
if os.path.exists(info_path):
with open(info_path, 'r') as f:
info = json.load(f)
fps = info.get('fps', default_fps) # 使用变量替代硬编码的20
fps = info.get('fps', default_fps) # 使用变量替代硬编码的20 (Use variable instead of hardcoded 20)
else:
fps = default_fps # 使用变量替代硬编码的20
fps = default_fps # 使用变量替代硬编码的20 (Use variable instead of hardcoded 20)
print(f"使用FPS={fps}")
print(f"使用FPS={fps} (Using FPS={fps})")
# 为每个episode复制和处理数据文件
# 为每个episode复制和处理数据文件 (Copy and process data files for each episode)
total_copied = 0
total_failed = 0
# 添加一个列表来记录失败的文件及原因
# 添加一个列表来记录失败的文件及原因
# (Add a list to record failed files and reasons)
failed_files = []
for i, (old_folder, old_index, new_index) in enumerate(episode_mapping):
# 尝试找到源parquet文件
# 尝试找到源parquet文件 (Try to find source parquet file)
episode_str = f"episode_{old_index:06d}.parquet"
source_paths = [
os.path.join(old_folder, "parquet", episode_str),
@ -528,22 +556,19 @@ def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18,
if source_path:
try:
# 读取parquet文件
# 读取parquet文件 (Read parquet file)
df = pd.read_parquet(source_path)
# 注释掉时间戳规范化
# df = normalize_timestamps_df(df, fps, base_time, i, dataset_idx)
# 检查是否需要填充维度
# 检查是否需要填充维度 (Check if dimensions need padding)
for feature in ['observation.state', 'action']:
if feature in df.columns:
# 检查第一个非空值
# 检查第一个非空值 (Check first non-null value)
for idx, value in enumerate(df[feature]):
if value is not None and isinstance(value, (list, np.ndarray)):
current_dim = len(value)
if current_dim < max_dim:
print(f"填充 {feature}{current_dim} 维到 {max_dim}")
# 使用零填充到目标维度
print(f"填充 {feature}{current_dim} 维到 {max_dim} (Padding {feature} from {current_dim} to {max_dim} dimensions)")
# 使用零填充到目标维度 (Pad with zeros to target dimension)
df[feature] = df[feature].apply(
lambda x: np.pad(x, (0, max_dim - len(x)), 'constant').tolist()
if x is not None and isinstance(x, (list, np.ndarray)) and len(x) < max_dim
@ -551,59 +576,57 @@ def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18,
)
break
# 确保目标目录存在
os.makedirs(os.path.join(output_folder, "parquet"), exist_ok=True)
# 更新episode_index列
# 更新episode_index列 (Update episode_index column)
if 'episode_index' in df.columns:
print(f"更新episode_index从 {df['episode_index'].iloc[0]}{new_index}")
print(f"更新episode_index从 {df['episode_index'].iloc[0]}{new_index} (Update episode_index from {df['episode_index'].iloc[0]} to {new_index})")
df['episode_index'] = new_index
# 更新index列
# 更新index列 (Update index column)
if 'index' in df.columns:
if episode_to_frame_index and new_index in episode_to_frame_index:
# 使用预先计算的帧索引起始值
# 使用预先计算的帧索引起始值 (Use pre-calculated frame index start value)
first_index = episode_to_frame_index[new_index]
print(f"更新index列起始值: {first_index}(使用全局累积帧计数)")
print(f"更新index列起始值: {first_index}(使用全局累积帧计数)(Update index column, start value: {first_index} (using global cumulative frame count))")
else:
# 如果没有提供映射,使用当前的计算方式作为回退
# (If no mapping provided, use current calculation as fallback)
first_index = new_index * len(df)
print(f"更新index列起始值: {first_index}使用episode索引乘以长度")
print(f"更新index列起始值: {first_index}使用episode索引乘以长度(Update index column, start value: {first_index} (using episode index multiplied by length))")
# 更新所有帧的索引
# 更新所有帧的索引 (Update indices for all frames)
df['index'] = [first_index + i for i in range(len(df))]
# 更新task_index列
# 更新task_index列 (Update task_index column)
if 'task_index' in df.columns and folder_task_mapping and old_folder in folder_task_mapping:
# 获取当前task_index
# 获取当前task_index (Get current task_index)
current_task_index = df['task_index'].iloc[0]
# 检查是否有对应的新索引
# 检查是否有对应的新索引 (Check if there's a corresponding new index)
if current_task_index in folder_task_mapping[old_folder]:
new_task_index = folder_task_mapping[old_folder][current_task_index]
print(f"更新task_index从 {current_task_index}{new_task_index}")
print(f"更新task_index从 {current_task_index}{new_task_index} (Update task_index from {current_task_index} to {new_task_index})")
df['task_index'] = new_task_index
else:
print(f"警告: 找不到task_index {current_task_index}的映射关系")
print(f"警告: 找不到task_index {current_task_index}的映射关系 (Warning: No mapping found for task_index {current_task_index})")
# 计算chunk编号
# 计算chunk编号 (Calculate chunk number)
chunk_index = new_index // chunks_size
# 创建正确的目标目录
# 创建正确的目标目录 (Create correct target directory)
chunk_dir = os.path.join(output_folder, "data", f"chunk-{chunk_index:03d}")
os.makedirs(chunk_dir, exist_ok=True)
# 构建正确的目标路径
# 构建正确的目标路径 (Build correct target path)
dest_path = os.path.join(chunk_dir, f"episode_{new_index:06d}.parquet")
# 保存到正确位置
# 保存到正确位置 (Save to correct location)
df.to_parquet(dest_path, index=False)
total_copied += 1
print(f"已处理并保存: {dest_path}")
print(f"已处理并保存: {dest_path} (Processed and saved: {dest_path})")
except Exception as e:
error_msg = f"处理 {source_path} 失败: {e}"
error_msg = f"处理 {source_path} 失败: {e} (Processing {source_path} failed: {e})"
print(error_msg)
traceback.print_exc()
failed_files.append({"file": source_path, "reason": str(e), "episode": old_index})
@ -617,66 +640,78 @@ def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18,
try:
source_path = os.path.join(root, file)
# 读取parquet文件
# 读取parquet文件 (Read parquet file)
df = pd.read_parquet(source_path)
# 注释掉时间戳规范化
# df = normalize_timestamps_df(df, fps, base_time, i, dataset_idx)
# 检查是否需要填充维度 (Check if dimensions need padding)
for feature in ['observation.state', 'action']:
if feature in df.columns:
# 检查第一个非空值 (Check first non-null value)
for idx, value in enumerate(df[feature]):
if value is not None and isinstance(value, (list, np.ndarray)):
current_dim = len(value)
if current_dim < max_dim:
print(f"填充 {feature}{current_dim} 维到 {max_dim} 维 (Padding {feature} from {current_dim} to {max_dim} dimensions)")
# 使用零填充到目标维度 (Pad with zeros to target dimension)
df[feature] = df[feature].apply(
lambda x: np.pad(x, (0, max_dim - len(x)), 'constant').tolist()
if x is not None and isinstance(x, (list, np.ndarray)) and len(x) < max_dim
else x
)
break
# 确保目标目录存在
os.makedirs(os.path.join(output_folder, "parquet"), exist_ok=True)
# 更新episode_index列
# 更新episode_index列 (Update episode_index column)
if 'episode_index' in df.columns:
print(f"更新episode_index从 {df['episode_index'].iloc[0]}{new_index}")
print(f"更新episode_index从 {df['episode_index'].iloc[0]}{new_index} (Update episode_index from {df['episode_index'].iloc[0]} to {new_index})")
df['episode_index'] = new_index
# 更新index列
# 更新index列 (Update index column)
if 'index' in df.columns:
if episode_to_frame_index and new_index in episode_to_frame_index:
# 使用预先计算的帧索引起始值
# 使用预先计算的帧索引起始值 (Use pre-calculated frame index start value)
first_index = episode_to_frame_index[new_index]
print(f"更新index列起始值: {first_index}(使用全局累积帧计数)")
print(f"更新index列起始值: {first_index}(使用全局累积帧计数)(Update index column, start value: {first_index} (using global cumulative frame count))")
else:
# 如果没有提供映射,使用当前的计算方式作为回退
# (If no mapping provided, use current calculation as fallback)
first_index = new_index * len(df)
print(f"更新index列起始值: {first_index}使用episode索引乘以长度")
print(f"更新index列起始值: {first_index}使用episode索引乘以长度(Update index column, start value: {first_index} (using episode index multiplied by length))")
# 更新所有帧的索引
# 更新所有帧的索引 (Update indices for all frames)
df['index'] = [first_index + i for i in range(len(df))]
# 更新task_index列
# 更新task_index列 (Update task_index column)
if 'task_index' in df.columns and folder_task_mapping and old_folder in folder_task_mapping:
# 获取当前task_index
# 获取当前task_index (Get current task_index)
current_task_index = df['task_index'].iloc[0]
# 检查是否有对应的新索引
# 检查是否有对应的新索引 (Check if there's a corresponding new index)
if current_task_index in folder_task_mapping[old_folder]:
new_task_index = folder_task_mapping[old_folder][current_task_index]
print(f"更新task_index从 {current_task_index}{new_task_index}")
print(f"更新task_index从 {current_task_index}{new_task_index} (Update task_index from {current_task_index} to {new_task_index})")
df['task_index'] = new_task_index
else:
print(f"警告: 找不到task_index {current_task_index}的映射关系")
print(f"警告: 找不到task_index {current_task_index}的映射关系 (Warning: No mapping found for task_index {current_task_index})")
# 计算chunk编号
# 计算chunk编号 (Calculate chunk number)
chunk_index = new_index // chunks_size
# 创建正确的目标目录
# 创建正确的目标目录 (Create correct target directory)
chunk_dir = os.path.join(output_folder, "data", f"chunk-{chunk_index:03d}")
os.makedirs(chunk_dir, exist_ok=True)
# 构建正确的目标路径
# 构建正确的目标路径 (Build correct target path)
dest_path = os.path.join(chunk_dir, f"episode_{new_index:06d}.parquet")
# 保存到正确位置
# 保存到正确位置 (Save to correct location)
df.to_parquet(dest_path, index=False)
total_copied += 1
found = True
print(f"已处理并保存: {dest_path}")
print(f"已处理并保存: {dest_path} (Processed and saved: {dest_path})")
break
except Exception as e:
error_msg = f"处理 {source_path} 失败: {e}"
error_msg = f"处理 {source_path} 失败: {e} (Processing {source_path} failed: {e})"
print(error_msg)
traceback.print_exc()
failed_files.append({"file": source_path, "reason": str(e), "episode": old_index})
@ -692,132 +727,30 @@ def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18,
print(f"共复制 {total_copied} 个数据文件,{total_failed} 个失败")
# 打印所有失败的文件详情
# 打印所有失败的文件详情 (Print details of all failed files)
if failed_files:
print("\n失败的文件详情:")
print("\n失败的文件详情 (Details of failed files):")
for i, failed in enumerate(failed_files):
print(f"{i+1}. 文件: {failed['file']}")
print(f"{i+1}. 文件 (File): {failed['file']}")
if 'folder' in failed:
print(f" 文件夹: {failed['folder']}")
print(f" 文件夹 (Folder): {failed['folder']}")
if 'episode' in failed:
print(f" Episode索引: {failed['episode']}")
print(f" 原因: {failed['reason']}")
print(f" Episode索引 (Episode index): {failed['episode']}")
print(f" 原因 (Reason): {failed['reason']}")
print("---")
return total_copied > 0
def copy_data_files_bak(source_folders, output_folder, episode_mapping):
"""Copy data files from source folders to output folder with updated episode indices."""
# Get info.json to determine data structure
info_path = os.path.join(source_folders[0], 'meta', 'info.json')
with open(info_path, 'r') as f:
info = json.load(f)
data_path_template = info['data_path']
# Process each episode mapping
for old_folder, old_index, new_index in episode_mapping:
# Calculate chunk indices
old_episode_chunk = old_index // info['chunks_size']
new_episode_chunk = new_index // info['chunks_size']
# Construct source path
source_data_path = os.path.join(
old_folder,
data_path_template.format(
episode_chunk=old_episode_chunk,
episode_index=old_index
)
)
# Construct destination path
dest_data_path = os.path.join(
output_folder,
data_path_template.format(
episode_chunk=new_episode_chunk,
episode_index=new_index
)
)
# Create destination directory if it doesn't exist
os.makedirs(os.path.dirname(dest_data_path), exist_ok=True)
# Check if source file exists
if os.path.exists(source_data_path):
print(f"Copying data: {source_data_path} -> {dest_data_path}")
# Check if we need to pad dimensions
try:
# Read the source parquet file to check dimensions
df = pd.read_parquet(source_data_path)
# Check if observation.state is a vector and needs padding
needs_padding = False
if 'observation.state' in df.columns:
first_state = df['observation.state'].iloc[0]
if isinstance(first_state, (list, np.ndarray)) and len(first_state) < 18:
needs_padding = True
if needs_padding:
# This needs padding to 18-dim
pad_parquet_data(source_data_path, dest_data_path, len(first_state), 18)
else:
# Just copy the file
shutil.copy2(source_data_path, dest_data_path)
except Exception as e:
print(f"Error processing {source_data_path}: {e}")
# Fall back to simple copy
shutil.copy2(source_data_path, dest_data_path)
else:
print(f"Warning: {source_data_path} not found, searching for parquet files in {old_folder}")
# Try to find the file by searching
found = False
for root, dirs, files in os.walk(old_folder):
for file in files:
if file.endswith('.parquet'):
source_data_path = os.path.join(root, file)
print(f"Found parquet file: {source_data_path}")
# Create a destination path based on the found file
rel_path = os.path.relpath(source_data_path, old_folder)
dest_data_path = os.path.join(output_folder, rel_path)
# Ensure destination directory exists
os.makedirs(os.path.dirname(dest_data_path), exist_ok=True)
# Check if we need to pad dimensions
try:
# Read the source parquet file to check dimensions
df = pd.read_parquet(source_data_path)
# Check if observation.state is a vector and needs padding
needs_padding = False
if 'observation.state' in df.columns:
first_state = df['observation.state'].iloc[0]
if isinstance(first_state, (list, np.ndarray)) and len(first_state) < 18:
needs_padding = True
if needs_padding:
# This needs padding to 18-dim
pad_parquet_data(source_data_path, dest_data_path, len(first_state), 18)
else:
# Just copy the file
shutil.copy2(source_data_path, dest_data_path)
found = True
except Exception as e:
print(f"Error processing {source_data_path}: {e}")
# Fall back to simple copy
shutil.copy2(source_data_path, dest_data_path)
found = True
if not found:
print(f"Warning: Could not find any parquet file in {old_folder}")
def pad_parquet_data(source_path, target_path, original_dim=14, target_dim=18):
"""
读取parquet文件将数据从original_dim维填充到target_dim维
填充的值为0然后保存到新的parquet文件
通过零填充将parquet数据从原始维度扩展到目标维度
(Extend parquet data from original dimension to target dimension by zero-padding)
Args:
source_path (str): 源parquet文件路径 (Source parquet file path)
target_path (str): 目标parquet文件路径 (Target parquet file path)
original_dim (int): 原始向量维度 (Original vector dimension)
target_dim (int): 目标向量维度 (Target vector dimension)
"""
# 读取parquet文件
df = pd.read_parquet(source_path)
@ -877,7 +810,26 @@ def pad_parquet_data(source_path, target_path, original_dim=14, target_dim=18):
return new_df
def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s=1e-4, max_dim=18, default_fps=20):
"""Merge multiple dataset folders into one."""
"""
将多个数据集文件夹合并为一个处理索引维度和元数据
(Merge multiple dataset folders into one, handling indices, dimensions, and metadata)
Args:
source_folders (list): 源数据集文件夹路径列表 (List of source dataset folder paths)
output_folder (str): 输出文件夹路径 (Output folder path)
validate_ts (bool): 是否验证时间戳 (Whether to validate timestamps)
tolerance_s (float): 时间戳不连续性的容差值以秒为单位 (Tolerance for timestamp discontinuities in seconds)
max_dim (int): 向量的最大维度 (Maximum dimension for vectors)
default_fps (float): 默认帧率 (Default frame rate)
这个函数执行以下操作:
(This function performs the following operations:)
1. 合并所有的episodestasks和stats (Merges all episodes, tasks and stats)
2. 重新编号所有的索引以保持连续性 (Renumbers all indices to maintain continuity)
3. 填充向量维度使其一致 (Pads vector dimensions for consistency)
4. 更新元数据文件 (Updates metadata files)
5. 复制并处理数据和视频文件 (Copies and processes data and video files)
"""
# Create output folder if it doesn't exist
os.makedirs(output_folder, exist_ok=True)
os.makedirs(os.path.join(output_folder, 'meta'), exist_ok=True)
@ -940,12 +892,13 @@ def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s
info = json.load(f)
chunks_size = info.get('chunks_size', 1000)
# 使用更简单的方法计算视频总数
# 使用更简单的方法计算视频总数 (Use simpler method to calculate total videos)
total_videos = 0
for folder in source_folders:
try:
# 从每个数据集的info.json直接获取total_videos
# (Get total_videos directly from each dataset's info.json)
folder_info_path = os.path.join(folder, 'meta', 'info.json')
if os.path.exists(folder_info_path):
with open(folder_info_path, 'r') as f:
@ -953,7 +906,7 @@ def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s
if 'total_videos' in folder_info:
folder_videos = folder_info['total_videos']
total_videos += folder_videos
print(f"{folder}的info.json中读取到视频数量: {folder_videos}")
print(f"{folder}的info.json中读取到视频数量: {folder_videos} (Read video count from {folder}'s info.json: {folder_videos})")
# Check dimensions of state vectors in this folder
folder_dim = max_dim # 使用变量替代硬编码的18
@ -1217,9 +1170,9 @@ def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s
info['features'][feature_name]['shape'] = [actual_max_dim]
print(f"Updated {feature_name} shape to {actual_max_dim}")
# 更新视频总数
# 更新视频总数 (Update total videos)
info['total_videos'] = total_videos
print(f"更新视频总数为: {total_videos}")
print(f"更新视频总数为: {total_videos} (Update total videos to: {total_videos})")
with open(os.path.join(output_folder, 'meta', 'info.json'), 'w') as f:
json.dump(info, f, indent=4)
@ -1236,15 +1189,7 @@ def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s
if __name__ == "__main__":
# Define source folders and output folder
source_folders = ["/data1/realman/put_plastic_to_box/", "/data1/realman/put_metal_to_box/","/data1/realman/Find_blue_box_pick_it_up_put_it_on_yellow_table"]
# Add all 50 directories to the source_folders list
for i in range(1, 51): # 1 to 50
folder_name = f"/data1/realman/325_test/Use your left hand to put the parts on the desktop into the box{i}"
source_folders.append(folder_name)
# Print the number of source folders to verify
print(f"Number of source folders: {len(source_folders)}")
source_folders = ["/path/to/put_plastic_to_box/", "/path/to/put_metal_to_box/","/path/to/Find_blue_box"]
output_folder = "/data1/realman/lerobot_merged_test/"
@ -1252,4 +1197,4 @@ if __name__ == "__main__":
default_fps = 20
# Merge the datasets
merge_datasets(source_folders, output_folder, max_dim=32, default_fps=default_fps)
merge_datasets(source_folders, output_folder, max_dim=32, default_fps=default_fps)