diff --git a/lerobot/scripts/merge.py b/lerobot/scripts/merge.py index c56fb4b3..81107bad 100644 --- a/lerobot/scripts/merge.py +++ b/lerobot/scripts/merge.py @@ -11,7 +11,16 @@ import logging def load_jsonl(file_path): - """Load data from a JSONL file.""" + """ + 从JSONL文件加载数据 + (Load data from a JSONL file) + + Args: + file_path (str): JSONL文件路径 (Path to the JSONL file) + + Returns: + list: 包含文件中每行JSON对象的列表 (List containing JSON objects from each line) + """ data = [] # Special handling for episodes_stats.jsonl @@ -56,13 +65,30 @@ def load_jsonl(file_path): return data def save_jsonl(data, file_path): - """Save data to a JSONL file.""" + """ + 将数据保存为JSONL格式 + (Save data in JSONL format) + + Args: + data (list): 要保存的JSON对象列表 (List of JSON objects to save) + file_path (str): 输出文件路径 (Path to the output file) + """ with open(file_path, 'w') as f: for item in data: f.write(json.dumps(item) + '\n') def merge_stats(stats_list): - """Merge statistics from multiple episodes.""" + """ + 合并多个数据集的统计信息,确保维度一致性 + (Merge statistics from multiple datasets, ensuring dimensional consistency) + + Args: + stats_list (list): 包含每个数据集统计信息的字典列表 + (List of dictionaries containing statistics for each dataset) + + Returns: + dict: 合并后的统计信息 (Merged statistics) + """ # Initialize merged stats with the structure of the first stats merged_stats = {} @@ -284,7 +310,16 @@ def merge_stats(stats_list): return merged_stats def copy_videos(source_folders, output_folder, episode_mapping): - """Copy video files from source folders to output folder with updated episode indices.""" + """ + 从源文件夹复制视频文件到输出文件夹,保持正确的索引和结构 + (Copy video files from source folders to output folder, maintaining correct indices and structure) + + Args: + source_folders (list): 源数据集文件夹路径列表 (List of source dataset folder paths) + output_folder (str): 输出文件夹路径 (Output folder path) + episode_mapping (list): 包含(旧文件夹,旧索引,新索引)元组的列表 + (List of tuples containing (old_folder, old_index, new_index)) + """ # Get info.json to determine video structure info_path = os.path.join(source_folders[0], 'meta', 'info.json') with open(info_path, 'r') as f: @@ -395,13 +430,24 @@ def copy_videos(source_folders, output_folder, episode_mapping): print(f"Warning: Video file not found for {video_key}, episode {old_index} in {old_folder}") def validate_timestamps(source_folders, tolerance_s=1e-4): - """验证源数据集的时间戳结构,识别潜在问题""" + """ + 验证源数据集的时间戳结构,识别潜在问题 + (Validate timestamp structure of source datasets, identify potential issues) + + Args: + source_folders (list): 源数据集文件夹路径列表 (List of source dataset folder paths) + tolerance_s (float): 时间戳不连续性的容差值,以秒为单位 (Tolerance for timestamp discontinuities in seconds) + + Returns: + tuple: (issues, fps_values) - 问题列表和检测到的FPS值列表 + (List of issues and list of detected FPS values) + """ issues = [] fps_values = [] for folder in source_folders: try: - # 尝试从 info.json 获取 FPS + # 尝试从 info.json 获取 FPS (Try to get FPS from info.json) info_path = os.path.join(folder, 'meta', 'info.json') if os.path.exists(info_path): with open(info_path, 'r') as f: @@ -409,9 +455,9 @@ def validate_timestamps(source_folders, tolerance_s=1e-4): if 'fps' in info: fps = info['fps'] fps_values.append(fps) - print(f"数据集 {folder} FPS={fps}") + print(f"数据集 {folder} FPS={fps} (Dataset {folder} FPS={fps})") - # 检查是否有parquet文件包含时间戳 + # 检查是否有parquet文件包含时间戳 (Check if any parquet files contain timestamps) parquet_path = None for root, _, files in os.walk(os.path.join(folder, "parquet")): for file in files: @@ -434,86 +480,68 @@ def validate_timestamps(source_folders, tolerance_s=1e-4): df = pd.read_parquet(parquet_path) timestamp_cols = [col for col in df.columns if 'timestamp' in col or 'time' in col] if timestamp_cols: - print(f"数据集 {folder} 包含时间戳列: {timestamp_cols}") + print(f"数据集 {folder} 包含时间戳列: {timestamp_cols} (Dataset {folder} contains timestamp columns: {timestamp_cols})") else: - issues.append(f"警告: 数据集 {folder} 没有时间戳列") + issues.append(f"警告: 数据集 {folder} 没有时间戳列 (Warning: Dataset {folder} has no timestamp columns)") else: - issues.append(f"警告: 数据集 {folder} 未找到parquet文件") + issues.append(f"警告: 数据集 {folder} 未找到parquet文件 (Warning: No parquet files found in dataset {folder})") except Exception as e: - issues.append(f"错误: 验证数据集 {folder} 失败: {e}") - print(f"验证错误: {e}") + issues.append(f"错误: 验证数据集 {folder} 失败: {e} (Error: Failed to validate dataset {folder}: {e})") + print(f"验证错误: {e} (Validation error: {e})") traceback.print_exc() - # 检查FPS是否一致 + # 检查FPS是否一致 (Check if FPS values are consistent) if len(set(fps_values)) > 1: - issues.append(f"警告: 数据集FPS不一致: {fps_values}") + issues.append(f"警告: 数据集FPS不一致: {fps_values} (Warning: Inconsistent FPS across datasets: {fps_values})") return issues, fps_values -def normalize_timestamps_df(df, fps=None, base_time=None, episode_idx=0, dataset_idx=0, default_fps=20): - """ - 规范化DataFrame中的时间戳,强制创建连续时间序列,同时考虑数据集差异 - """ - normalized_df = df.copy() - - # 找出所有时间戳列 - timestamp_cols = [col for col in df.columns if 'timestamp' in col or 'time' in col] - - if not timestamp_cols: - return normalized_df # 没有时间戳列,直接返回 - - # 如果未提供fps,使用默认值 - if fps is None: - fps = default_fps - - # 确定基准时间 - if base_time is None: - # 使用当前时间作为基准,避免使用可能有问题的原始时间戳 - base_time = datetime.now().timestamp() - - # 为每个数据集添加一个较大的时间偏移(例如,每个数据集间隔1天) - dataset_offset = dataset_idx * 86400 # 86400秒 = 1天 - - # 为每个episode计算唯一的起始时间,避免重叠 - frame_interval = 1.0 / fps - episode_duration = len(df) * frame_interval - # 添加10帧的缓冲,确保episodes之间有间隔 - start_time = base_time + dataset_offset + episode_idx * (episode_duration + 10 * frame_interval) - - print(f"数据集 {dataset_idx}, Episode {episode_idx}: 创建连续时间戳,起始于 {start_time}, 间隔 {frame_interval}秒") - - # 对每个时间戳列创建严格递增的时间序列 - for col in timestamp_cols: - normalized_df[col] = [start_time + i * frame_interval for i in range(len(df))] - - return normalized_df - - def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18, fps=None, episode_to_frame_index=None, folder_task_mapping=None, chunks_size=1000, default_fps=20): - """复制parquet数据文件""" - # 获取第一个数据集的FPS(如果未提供) + """ + 复制并处理parquet数据文件,包括维度填充和索引更新 + (Copy and process parquet data files, including dimension padding and index updates) + + Args: + source_folders (list): 源数据集文件夹路径列表 (List of source dataset folder paths) + output_folder (str): 输出文件夹路径 (Output folder path) + episode_mapping (list): 包含(旧文件夹,旧索引,新索引)元组的列表 + (List of tuples containing (old_folder, old_index, new_index)) + max_dim (int): 向量的最大维度 (Maximum dimension for vectors) + fps (float, optional): 帧率,如果未提供则从第一个数据集获取 (Frame rate, if not provided will be obtained from the first dataset) + episode_to_frame_index (dict, optional): 每个新episode索引对应的起始帧索引映射 + (Mapping of each new episode index to its starting frame index) + folder_task_mapping (dict, optional): 每个文件夹中task_index的映射关系 + (Mapping of task_index for each folder) + chunks_size (int): 每个chunk包含的episode数量 (Number of episodes per chunk) + default_fps (float): 默认帧率,当无法从数据集获取时使用 (Default frame rate when unable to obtain from dataset) + + Returns: + bool: 是否成功复制了至少一个文件 (Whether at least one file was successfully copied) + """ + # 获取第一个数据集的FPS(如果未提供)(Get FPS from first dataset if not provided) if fps is None: info_path = os.path.join(source_folders[0], 'meta', 'info.json') if os.path.exists(info_path): with open(info_path, 'r') as f: info = json.load(f) - fps = info.get('fps', default_fps) # 使用变量替代硬编码的20 + fps = info.get('fps', default_fps) # 使用变量替代硬编码的20 (Use variable instead of hardcoded 20) else: - fps = default_fps # 使用变量替代硬编码的20 + fps = default_fps # 使用变量替代硬编码的20 (Use variable instead of hardcoded 20) - print(f"使用FPS={fps}") + print(f"使用FPS={fps} (Using FPS={fps})") - # 为每个episode复制和处理数据文件 + # 为每个episode复制和处理数据文件 (Copy and process data files for each episode) total_copied = 0 total_failed = 0 - # 添加一个列表来记录失败的文件及原因 + # 添加一个列表来记录失败的文件及原因 + # (Add a list to record failed files and reasons) failed_files = [] for i, (old_folder, old_index, new_index) in enumerate(episode_mapping): - # 尝试找到源parquet文件 + # 尝试找到源parquet文件 (Try to find source parquet file) episode_str = f"episode_{old_index:06d}.parquet" source_paths = [ os.path.join(old_folder, "parquet", episode_str), @@ -528,22 +556,19 @@ def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18, if source_path: try: - # 读取parquet文件 + # 读取parquet文件 (Read parquet file) df = pd.read_parquet(source_path) - # 注释掉时间戳规范化 - # df = normalize_timestamps_df(df, fps, base_time, i, dataset_idx) - - # 检查是否需要填充维度 + # 检查是否需要填充维度 (Check if dimensions need padding) for feature in ['observation.state', 'action']: if feature in df.columns: - # 检查第一个非空值 + # 检查第一个非空值 (Check first non-null value) for idx, value in enumerate(df[feature]): if value is not None and isinstance(value, (list, np.ndarray)): current_dim = len(value) if current_dim < max_dim: - print(f"填充 {feature} 从 {current_dim} 维到 {max_dim} 维") - # 使用零填充到目标维度 + print(f"填充 {feature} 从 {current_dim} 维到 {max_dim} 维 (Padding {feature} from {current_dim} to {max_dim} dimensions)") + # 使用零填充到目标维度 (Pad with zeros to target dimension) df[feature] = df[feature].apply( lambda x: np.pad(x, (0, max_dim - len(x)), 'constant').tolist() if x is not None and isinstance(x, (list, np.ndarray)) and len(x) < max_dim @@ -551,59 +576,57 @@ def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18, ) break - # 确保目标目录存在 - os.makedirs(os.path.join(output_folder, "parquet"), exist_ok=True) - - # 更新episode_index列 + # 更新episode_index列 (Update episode_index column) if 'episode_index' in df.columns: - print(f"更新episode_index从 {df['episode_index'].iloc[0]} 到 {new_index}") + print(f"更新episode_index从 {df['episode_index'].iloc[0]} 到 {new_index} (Update episode_index from {df['episode_index'].iloc[0]} to {new_index})") df['episode_index'] = new_index - # 更新index列 + # 更新index列 (Update index column) if 'index' in df.columns: if episode_to_frame_index and new_index in episode_to_frame_index: - # 使用预先计算的帧索引起始值 + # 使用预先计算的帧索引起始值 (Use pre-calculated frame index start value) first_index = episode_to_frame_index[new_index] - print(f"更新index列,起始值: {first_index}(使用全局累积帧计数)") + print(f"更新index列,起始值: {first_index}(使用全局累积帧计数)(Update index column, start value: {first_index} (using global cumulative frame count))") else: # 如果没有提供映射,使用当前的计算方式作为回退 + # (If no mapping provided, use current calculation as fallback) first_index = new_index * len(df) - print(f"更新index列,起始值: {first_index}(使用episode索引乘以长度)") + print(f"更新index列,起始值: {first_index}(使用episode索引乘以长度)(Update index column, start value: {first_index} (using episode index multiplied by length))") - # 更新所有帧的索引 + # 更新所有帧的索引 (Update indices for all frames) df['index'] = [first_index + i for i in range(len(df))] - # 更新task_index列 + # 更新task_index列 (Update task_index column) if 'task_index' in df.columns and folder_task_mapping and old_folder in folder_task_mapping: - # 获取当前task_index + # 获取当前task_index (Get current task_index) current_task_index = df['task_index'].iloc[0] - # 检查是否有对应的新索引 + # 检查是否有对应的新索引 (Check if there's a corresponding new index) if current_task_index in folder_task_mapping[old_folder]: new_task_index = folder_task_mapping[old_folder][current_task_index] - print(f"更新task_index从 {current_task_index} 到 {new_task_index}") + print(f"更新task_index从 {current_task_index} 到 {new_task_index} (Update task_index from {current_task_index} to {new_task_index})") df['task_index'] = new_task_index else: - print(f"警告: 找不到task_index {current_task_index}的映射关系") + print(f"警告: 找不到task_index {current_task_index}的映射关系 (Warning: No mapping found for task_index {current_task_index})") - # 计算chunk编号 + # 计算chunk编号 (Calculate chunk number) chunk_index = new_index // chunks_size - # 创建正确的目标目录 + # 创建正确的目标目录 (Create correct target directory) chunk_dir = os.path.join(output_folder, "data", f"chunk-{chunk_index:03d}") os.makedirs(chunk_dir, exist_ok=True) - # 构建正确的目标路径 + # 构建正确的目标路径 (Build correct target path) dest_path = os.path.join(chunk_dir, f"episode_{new_index:06d}.parquet") - # 保存到正确位置 + # 保存到正确位置 (Save to correct location) df.to_parquet(dest_path, index=False) total_copied += 1 - print(f"已处理并保存: {dest_path}") + print(f"已处理并保存: {dest_path} (Processed and saved: {dest_path})") except Exception as e: - error_msg = f"处理 {source_path} 失败: {e}" + error_msg = f"处理 {source_path} 失败: {e} (Processing {source_path} failed: {e})" print(error_msg) traceback.print_exc() failed_files.append({"file": source_path, "reason": str(e), "episode": old_index}) @@ -617,66 +640,78 @@ def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18, try: source_path = os.path.join(root, file) - # 读取parquet文件 + # 读取parquet文件 (Read parquet file) df = pd.read_parquet(source_path) - # 注释掉时间戳规范化 - # df = normalize_timestamps_df(df, fps, base_time, i, dataset_idx) + # 检查是否需要填充维度 (Check if dimensions need padding) + for feature in ['observation.state', 'action']: + if feature in df.columns: + # 检查第一个非空值 (Check first non-null value) + for idx, value in enumerate(df[feature]): + if value is not None and isinstance(value, (list, np.ndarray)): + current_dim = len(value) + if current_dim < max_dim: + print(f"填充 {feature} 从 {current_dim} 维到 {max_dim} 维 (Padding {feature} from {current_dim} to {max_dim} dimensions)") + # 使用零填充到目标维度 (Pad with zeros to target dimension) + df[feature] = df[feature].apply( + lambda x: np.pad(x, (0, max_dim - len(x)), 'constant').tolist() + if x is not None and isinstance(x, (list, np.ndarray)) and len(x) < max_dim + else x + ) + break - # 确保目标目录存在 - os.makedirs(os.path.join(output_folder, "parquet"), exist_ok=True) - - # 更新episode_index列 + # 更新episode_index列 (Update episode_index column) if 'episode_index' in df.columns: - print(f"更新episode_index从 {df['episode_index'].iloc[0]} 到 {new_index}") + print(f"更新episode_index从 {df['episode_index'].iloc[0]} 到 {new_index} (Update episode_index from {df['episode_index'].iloc[0]} to {new_index})") df['episode_index'] = new_index - # 更新index列 + # 更新index列 (Update index column) if 'index' in df.columns: if episode_to_frame_index and new_index in episode_to_frame_index: - # 使用预先计算的帧索引起始值 + # 使用预先计算的帧索引起始值 (Use pre-calculated frame index start value) first_index = episode_to_frame_index[new_index] - print(f"更新index列,起始值: {first_index}(使用全局累积帧计数)") + print(f"更新index列,起始值: {first_index}(使用全局累积帧计数)(Update index column, start value: {first_index} (using global cumulative frame count))") else: # 如果没有提供映射,使用当前的计算方式作为回退 + # (If no mapping provided, use current calculation as fallback) first_index = new_index * len(df) - print(f"更新index列,起始值: {first_index}(使用episode索引乘以长度)") + print(f"更新index列,起始值: {first_index}(使用episode索引乘以长度)(Update index column, start value: {first_index} (using episode index multiplied by length))") - # 更新所有帧的索引 + # 更新所有帧的索引 (Update indices for all frames) df['index'] = [first_index + i for i in range(len(df))] - # 更新task_index列 + # 更新task_index列 (Update task_index column) if 'task_index' in df.columns and folder_task_mapping and old_folder in folder_task_mapping: - # 获取当前task_index + # 获取当前task_index (Get current task_index) current_task_index = df['task_index'].iloc[0] - # 检查是否有对应的新索引 + # 检查是否有对应的新索引 (Check if there's a corresponding new index) if current_task_index in folder_task_mapping[old_folder]: new_task_index = folder_task_mapping[old_folder][current_task_index] - print(f"更新task_index从 {current_task_index} 到 {new_task_index}") + print(f"更新task_index从 {current_task_index} 到 {new_task_index} (Update task_index from {current_task_index} to {new_task_index})") df['task_index'] = new_task_index else: - print(f"警告: 找不到task_index {current_task_index}的映射关系") + print(f"警告: 找不到task_index {current_task_index}的映射关系 (Warning: No mapping found for task_index {current_task_index})") - # 计算chunk编号 + # 计算chunk编号 (Calculate chunk number) chunk_index = new_index // chunks_size - # 创建正确的目标目录 + # 创建正确的目标目录 (Create correct target directory) chunk_dir = os.path.join(output_folder, "data", f"chunk-{chunk_index:03d}") os.makedirs(chunk_dir, exist_ok=True) - # 构建正确的目标路径 + # 构建正确的目标路径 (Build correct target path) dest_path = os.path.join(chunk_dir, f"episode_{new_index:06d}.parquet") - # 保存到正确位置 + # 保存到正确位置 (Save to correct location) df.to_parquet(dest_path, index=False) total_copied += 1 found = True - print(f"已处理并保存: {dest_path}") + print(f"已处理并保存: {dest_path} (Processed and saved: {dest_path})") break except Exception as e: - error_msg = f"处理 {source_path} 失败: {e}" + error_msg = f"处理 {source_path} 失败: {e} (Processing {source_path} failed: {e})" print(error_msg) traceback.print_exc() failed_files.append({"file": source_path, "reason": str(e), "episode": old_index}) @@ -692,132 +727,30 @@ def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18, print(f"共复制 {total_copied} 个数据文件,{total_failed} 个失败") - # 打印所有失败的文件详情 + # 打印所有失败的文件详情 (Print details of all failed files) if failed_files: - print("\n失败的文件详情:") + print("\n失败的文件详情 (Details of failed files):") for i, failed in enumerate(failed_files): - print(f"{i+1}. 文件: {failed['file']}") + print(f"{i+1}. 文件 (File): {failed['file']}") if 'folder' in failed: - print(f" 文件夹: {failed['folder']}") + print(f" 文件夹 (Folder): {failed['folder']}") if 'episode' in failed: - print(f" Episode索引: {failed['episode']}") - print(f" 原因: {failed['reason']}") + print(f" Episode索引 (Episode index): {failed['episode']}") + print(f" 原因 (Reason): {failed['reason']}") print("---") return total_copied > 0 -def copy_data_files_bak(source_folders, output_folder, episode_mapping): - """Copy data files from source folders to output folder with updated episode indices.""" - # Get info.json to determine data structure - info_path = os.path.join(source_folders[0], 'meta', 'info.json') - with open(info_path, 'r') as f: - info = json.load(f) - - data_path_template = info['data_path'] - - # Process each episode mapping - for old_folder, old_index, new_index in episode_mapping: - # Calculate chunk indices - old_episode_chunk = old_index // info['chunks_size'] - new_episode_chunk = new_index // info['chunks_size'] - - # Construct source path - source_data_path = os.path.join( - old_folder, - data_path_template.format( - episode_chunk=old_episode_chunk, - episode_index=old_index - ) - ) - - # Construct destination path - dest_data_path = os.path.join( - output_folder, - data_path_template.format( - episode_chunk=new_episode_chunk, - episode_index=new_index - ) - ) - - # Create destination directory if it doesn't exist - os.makedirs(os.path.dirname(dest_data_path), exist_ok=True) - - # Check if source file exists - if os.path.exists(source_data_path): - print(f"Copying data: {source_data_path} -> {dest_data_path}") - - # Check if we need to pad dimensions - try: - # Read the source parquet file to check dimensions - df = pd.read_parquet(source_data_path) - - # Check if observation.state is a vector and needs padding - needs_padding = False - if 'observation.state' in df.columns: - first_state = df['observation.state'].iloc[0] - if isinstance(first_state, (list, np.ndarray)) and len(first_state) < 18: - needs_padding = True - - if needs_padding: - # This needs padding to 18-dim - pad_parquet_data(source_data_path, dest_data_path, len(first_state), 18) - else: - # Just copy the file - shutil.copy2(source_data_path, dest_data_path) - except Exception as e: - print(f"Error processing {source_data_path}: {e}") - # Fall back to simple copy - shutil.copy2(source_data_path, dest_data_path) - else: - print(f"Warning: {source_data_path} not found, searching for parquet files in {old_folder}") - # Try to find the file by searching - found = False - for root, dirs, files in os.walk(old_folder): - for file in files: - if file.endswith('.parquet'): - source_data_path = os.path.join(root, file) - print(f"Found parquet file: {source_data_path}") - - # Create a destination path based on the found file - rel_path = os.path.relpath(source_data_path, old_folder) - dest_data_path = os.path.join(output_folder, rel_path) - - # Ensure destination directory exists - os.makedirs(os.path.dirname(dest_data_path), exist_ok=True) - - # Check if we need to pad dimensions - try: - # Read the source parquet file to check dimensions - df = pd.read_parquet(source_data_path) - - # Check if observation.state is a vector and needs padding - needs_padding = False - if 'observation.state' in df.columns: - first_state = df['observation.state'].iloc[0] - if isinstance(first_state, (list, np.ndarray)) and len(first_state) < 18: - needs_padding = True - - if needs_padding: - # This needs padding to 18-dim - pad_parquet_data(source_data_path, dest_data_path, len(first_state), 18) - else: - # Just copy the file - shutil.copy2(source_data_path, dest_data_path) - - found = True - except Exception as e: - print(f"Error processing {source_data_path}: {e}") - # Fall back to simple copy - shutil.copy2(source_data_path, dest_data_path) - found = True - - if not found: - print(f"Warning: Could not find any parquet file in {old_folder}") - def pad_parquet_data(source_path, target_path, original_dim=14, target_dim=18): """ - 读取parquet文件,将数据从original_dim维填充到target_dim维, - 填充的值为0,然后保存到新的parquet文件。 + 通过零填充将parquet数据从原始维度扩展到目标维度 + (Extend parquet data from original dimension to target dimension by zero-padding) + + Args: + source_path (str): 源parquet文件路径 (Source parquet file path) + target_path (str): 目标parquet文件路径 (Target parquet file path) + original_dim (int): 原始向量维度 (Original vector dimension) + target_dim (int): 目标向量维度 (Target vector dimension) """ # 读取parquet文件 df = pd.read_parquet(source_path) @@ -877,7 +810,26 @@ def pad_parquet_data(source_path, target_path, original_dim=14, target_dim=18): return new_df def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s=1e-4, max_dim=18, default_fps=20): - """Merge multiple dataset folders into one.""" + """ + 将多个数据集文件夹合并为一个,处理索引、维度和元数据 + (Merge multiple dataset folders into one, handling indices, dimensions, and metadata) + + Args: + source_folders (list): 源数据集文件夹路径列表 (List of source dataset folder paths) + output_folder (str): 输出文件夹路径 (Output folder path) + validate_ts (bool): 是否验证时间戳 (Whether to validate timestamps) + tolerance_s (float): 时间戳不连续性的容差值,以秒为单位 (Tolerance for timestamp discontinuities in seconds) + max_dim (int): 向量的最大维度 (Maximum dimension for vectors) + default_fps (float): 默认帧率 (Default frame rate) + + 这个函数执行以下操作: + (This function performs the following operations:) + 1. 合并所有的episodes、tasks和stats (Merges all episodes, tasks and stats) + 2. 重新编号所有的索引以保持连续性 (Renumbers all indices to maintain continuity) + 3. 填充向量维度使其一致 (Pads vector dimensions for consistency) + 4. 更新元数据文件 (Updates metadata files) + 5. 复制并处理数据和视频文件 (Copies and processes data and video files) + """ # Create output folder if it doesn't exist os.makedirs(output_folder, exist_ok=True) os.makedirs(os.path.join(output_folder, 'meta'), exist_ok=True) @@ -940,12 +892,13 @@ def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s info = json.load(f) chunks_size = info.get('chunks_size', 1000) - # 使用更简单的方法计算视频总数 + # 使用更简单的方法计算视频总数 (Use simpler method to calculate total videos) total_videos = 0 for folder in source_folders: try: # 从每个数据集的info.json直接获取total_videos + # (Get total_videos directly from each dataset's info.json) folder_info_path = os.path.join(folder, 'meta', 'info.json') if os.path.exists(folder_info_path): with open(folder_info_path, 'r') as f: @@ -953,7 +906,7 @@ def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s if 'total_videos' in folder_info: folder_videos = folder_info['total_videos'] total_videos += folder_videos - print(f"从{folder}的info.json中读取到视频数量: {folder_videos}") + print(f"从{folder}的info.json中读取到视频数量: {folder_videos} (Read video count from {folder}'s info.json: {folder_videos})") # Check dimensions of state vectors in this folder folder_dim = max_dim # 使用变量替代硬编码的18 @@ -1217,9 +1170,9 @@ def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s info['features'][feature_name]['shape'] = [actual_max_dim] print(f"Updated {feature_name} shape to {actual_max_dim}") - # 更新视频总数 + # 更新视频总数 (Update total videos) info['total_videos'] = total_videos - print(f"更新视频总数为: {total_videos}") + print(f"更新视频总数为: {total_videos} (Update total videos to: {total_videos})") with open(os.path.join(output_folder, 'meta', 'info.json'), 'w') as f: json.dump(info, f, indent=4) @@ -1236,15 +1189,7 @@ def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s if __name__ == "__main__": # Define source folders and output folder - source_folders = ["/data1/realman/put_plastic_to_box/", "/data1/realman/put_metal_to_box/","/data1/realman/Find_blue_box_pick_it_up_put_it_on_yellow_table"] - - # Add all 50 directories to the source_folders list - for i in range(1, 51): # 1 to 50 - folder_name = f"/data1/realman/325_test/Use your left hand to put the parts on the desktop into the box{i}" - source_folders.append(folder_name) - - # Print the number of source folders to verify - print(f"Number of source folders: {len(source_folders)}") + source_folders = ["/path/to/put_plastic_to_box/", "/path/to/put_metal_to_box/","/path/to/Find_blue_box"] output_folder = "/data1/realman/lerobot_merged_test/" @@ -1252,4 +1197,4 @@ if __name__ == "__main__": default_fps = 20 # Merge the datasets - merge_datasets(source_folders, output_folder, max_dim=32, default_fps=default_fps) \ No newline at end of file + merge_datasets(source_folders, output_folder, max_dim=32, default_fps=default_fps)