From 6b2e9448a24dbd155df7c53b208cf446abdaafe6 Mon Sep 17 00:00:00 2001 From: zhipeng tang <2444198418@qq.com> Date: Tue, 1 Apr 2025 15:06:33 +0800 Subject: [PATCH 1/9] Improved Dataset Merge Script for Multiple Dataset MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR addresses issue regarding merging, converting and editing datasets. The improved merge.py script provides robust functionality for combining multiple datasets with different dimensions, tasks, and indices. Key Improvements: 1、Multi-dataset Merging: Fixed the logic for merging datasets from different sources while preserving data integrity and continuity. 2、Dimension Handling: Added dynamic dimension detection and padding to ensure all observation and action vectors are consistently sized. The script now supports configurable maximum dimensions (default is 18, but can be overridden). 3. Index Consistency: Implemented continuous global frame indexing to avoid overlapping or gaps in indices after merging. 4、Task Mapping: Fixed task_index updates to ensure proper mapping across merged datasets with different task descriptions. 5、FPS Consistency: Added checks to ensure consistent FPS across datasets, with configurable default values. 6、Directory Structure: Improved output directory organization using chunk-based structure for better scalability. 7、Error Logging: Enhanced error reporting for failed files to aid debugging. Usage Example: # Define source folders and output folder source_folders = [ "/path/to/dataset1/", "/path/to/dataset2/", "/path/to/dataset3/" ] output_folder = "/path/to/merged_dataset/" # Merge the datasets with custom parameters merge_datasets( source_folders, output_folder, max_dim=32, # Set maximum dimension for observation.state and action default_fps=20 # Set default FPS if not specified in datasets ) --- lerobot/scripts/merge.py | 1255 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 1255 insertions(+) create mode 100644 lerobot/scripts/merge.py diff --git a/lerobot/scripts/merge.py b/lerobot/scripts/merge.py new file mode 100644 index 00000000..c56fb4b3 --- /dev/null +++ b/lerobot/scripts/merge.py @@ -0,0 +1,1255 @@ +import json +import os +import shutil +from pathlib import Path +import numpy as np +import pandas as pd +import pandas as pd +from datetime import datetime +import traceback +import logging + + +def load_jsonl(file_path): + """Load data from a JSONL file.""" + data = [] + + # Special handling for episodes_stats.jsonl + if 'episodes_stats.jsonl' in file_path: + try: + # Try to load the entire file as a JSON array + with open(file_path, 'r') as f: + content = f.read() + # Check if the content starts with '[' and ends with ']' + if content.strip().startswith('[') and content.strip().endswith(']'): + return json.loads(content) + else: + # Try to add brackets and parse + try: + return json.loads('[' + content + ']') + except json.JSONDecodeError: + pass + except Exception as e: + print(f"Error loading {file_path} as JSON array: {e}") + + # Fall back to line-by-line parsing + try: + with open(file_path, 'r') as f: + for line in f: + if line.strip(): + try: + data.append(json.loads(line)) + except json.JSONDecodeError: + pass + except Exception as e: + print(f"Error loading {file_path} line by line: {e}") + else: + # Standard JSONL parsing for other files + with open(file_path, 'r') as f: + for line in f: + if line.strip(): + try: + data.append(json.loads(line)) + except json.JSONDecodeError: + print(f"Warning: Could not parse line in {file_path}: {line[:100]}...") + + return data + +def save_jsonl(data, file_path): + """Save data to a JSONL file.""" + with open(file_path, 'w') as f: + for item in data: + f.write(json.dumps(item) + '\n') + +def merge_stats(stats_list): + """Merge statistics from multiple episodes.""" + # Initialize merged stats with the structure of the first stats + merged_stats = {} + + # Find common features across all stats + common_features = set(stats_list[0].keys()) + for stats in stats_list[1:]: + common_features = common_features.intersection(set(stats.keys())) + + # Process features in the order they appear in the first stats file + for feature in stats_list[0].keys(): + if feature not in common_features: + continue + + merged_stats[feature] = {} + + # Find common stat types for this feature + common_stat_types = [] + for stat_type in ['mean', 'std', 'max', 'min']: + if all(stat_type in stats[feature] for stats in stats_list): + common_stat_types.append(stat_type) + + # Determine the original shape of each value + original_shapes = [] + for stats in stats_list: + if 'mean' in stats[feature]: + shape = np.array(stats[feature]['mean']).shape + original_shapes.append(shape) + + # Special handling for image features to preserve nested structure + if feature.startswith('observation.images.'): + for stat_type in common_stat_types: + try: + # Get all values + values = [stats[feature][stat_type] for stats in stats_list] + + # For image features, we need to preserve the nested structure + # Initialize with the first value's structure + result = [] + + # For RGB channels + for channel_idx in range(len(values[0])): + channel_result = [] + + # For each pixel row + for pixel_idx in range(len(values[0][channel_idx])): + pixel_result = [] + + # For each pixel value + for value_idx in range(len(values[0][channel_idx][pixel_idx])): + # Calculate statistic based on type + if stat_type == 'mean': + # Simple average + avg = sum(values[i][channel_idx][pixel_idx][value_idx] for i in range(len(values))) / len(values) + pixel_result.append(avg) + elif stat_type == 'std': + # Simple average of std + avg = sum(values[i][channel_idx][pixel_idx][value_idx] for i in range(len(values))) / len(values) + pixel_result.append(avg) + elif stat_type == 'max': + # Maximum + max_val = max(values[i][channel_idx][pixel_idx][value_idx] for i in range(len(values))) + pixel_result.append(max_val) + elif stat_type == 'min': + # Minimum + min_val = min(values[i][channel_idx][pixel_idx][value_idx] for i in range(len(values))) + pixel_result.append(min_val) + + channel_result.append(pixel_result) + + result.append(channel_result) + + merged_stats[feature][stat_type] = result + except Exception as e: + print(f"Warning: Error processing image feature {feature}.{stat_type}: {e}") + # Fallback to first value + merged_stats[feature][stat_type] = values[0] + # If all shapes are the same, no need for special handling + elif len(set(str(shape) for shape in original_shapes)) == 1: + # All shapes are the same, use standard merging + for stat_type in common_stat_types: + values = [stats[feature][stat_type] for stats in stats_list] + + try: + # Calculate the new statistic based on the type + if stat_type == 'mean': + if all('count' in stats[feature] for stats in stats_list): + counts = [stats[feature]['count'][0] for stats in stats_list] + total_count = sum(counts) + weighted_values = [np.array(val) * count / total_count for val, count in zip(values, counts)] + merged_stats[feature][stat_type] = np.sum(weighted_values, axis=0).tolist() + else: + merged_stats[feature][stat_type] = np.mean(np.array(values), axis=0).tolist() + + elif stat_type == 'std': + if all('count' in stats[feature] for stats in stats_list): + counts = [stats[feature]['count'][0] for stats in stats_list] + total_count = sum(counts) + variances = [np.array(std)**2 for std in values] + weighted_variances = [var * count / total_count for var, count in zip(variances, counts)] + merged_stats[feature][stat_type] = np.sqrt(np.sum(weighted_variances, axis=0)).tolist() + else: + merged_stats[feature][stat_type] = np.mean(np.array(values), axis=0).tolist() + + elif stat_type == 'max': + merged_stats[feature][stat_type] = np.maximum.reduce(np.array(values)).tolist() + + elif stat_type == 'min': + merged_stats[feature][stat_type] = np.minimum.reduce(np.array(values)).tolist() + except Exception as e: + print(f"Warning: Error processing {feature}.{stat_type}: {e}") + continue + else: + # Shapes are different, need special handling for state vectors + if feature in ['observation.state', 'action']: + # For state vectors, we need to handle different dimensions + max_dim = max(len(np.array(stats[feature]['mean']).flatten()) for stats in stats_list) + + for stat_type in common_stat_types: + try: + # Get values and their original dimensions + values_with_dims = [] + for stats in stats_list: + val = np.array(stats[feature][stat_type]).flatten() + dim = len(val) + values_with_dims.append((val, dim)) + + # Initialize result array with zeros + result = np.zeros(max_dim) + + # Calculate statistics for each dimension separately + if stat_type == 'mean': + if all('count' in stats[feature] for stats in stats_list): + counts = [stats[feature]['count'][0] for stats in stats_list] + total_count = sum(counts) + + # For each dimension, calculate weighted mean of available values + for d in range(max_dim): + dim_values = [] + dim_weights = [] + for (val, dim), count in zip(values_with_dims, counts): + if d < dim: # Only use values that have this dimension + dim_values.append(val[d]) + dim_weights.append(count) + + if dim_values: # If we have values for this dimension + weighted_sum = sum(v * w for v, w in zip(dim_values, dim_weights)) + result[d] = weighted_sum / sum(dim_weights) + else: + # Simple average for each dimension + for d in range(max_dim): + dim_values = [val[d] for val, dim in values_with_dims if d < dim] + if dim_values: + result[d] = sum(dim_values) / len(dim_values) + + elif stat_type == 'std': + if all('count' in stats[feature] for stats in stats_list): + counts = [stats[feature]['count'][0] for stats in stats_list] + total_count = sum(counts) + + # For each dimension, calculate weighted variance + for d in range(max_dim): + dim_variances = [] + dim_weights = [] + for (val, dim), count in zip(values_with_dims, counts): + if d < dim: # Only use values that have this dimension + dim_variances.append(val[d]**2) # Square for variance + dim_weights.append(count) + + if dim_variances: # If we have values for this dimension + weighted_var = sum(v * w for v, w in zip(dim_variances, dim_weights)) / sum(dim_weights) + result[d] = np.sqrt(weighted_var) # Take sqrt for std + else: + # Simple average of std for each dimension + for d in range(max_dim): + dim_values = [val[d] for val, dim in values_with_dims if d < dim] + if dim_values: + result[d] = sum(dim_values) / len(dim_values) + + elif stat_type == 'max': + # For each dimension, take the maximum of available values + for d in range(max_dim): + dim_values = [val[d] for val, dim in values_with_dims if d < dim] + if dim_values: + result[d] = max(dim_values) + + elif stat_type == 'min': + # For each dimension, take the minimum of available values + for d in range(max_dim): + dim_values = [val[d] for val, dim in values_with_dims if d < dim] + if dim_values: + result[d] = min(dim_values) + + # Convert result to list and store + merged_stats[feature][stat_type] = result.tolist() + + except Exception as e: + print(f"Warning: Error processing {feature}.{stat_type} with different dimensions: {e}") + continue + else: + # For other features with different shapes, use the first shape as template + template_shape = original_shapes[0] + print(f"Using shape {template_shape} as template for {feature}") + + for stat_type in common_stat_types: + try: + # Use the first stats as template + merged_stats[feature][stat_type] = stats_list[0][feature][stat_type] + except Exception as e: + print(f"Warning: Error processing {feature}.{stat_type} with shape {template_shape}: {e}") + continue + + # Add count if available in all stats + if all('count' in stats[feature] for stats in stats_list): + try: + merged_stats[feature]['count'] = [sum(stats[feature]['count'][0] for stats in stats_list)] + except Exception as e: + print(f"Warning: Error processing {feature}.count: {e}") + + return merged_stats + +def copy_videos(source_folders, output_folder, episode_mapping): + """Copy video files from source folders to output folder with updated episode indices.""" + # Get info.json to determine video structure + info_path = os.path.join(source_folders[0], 'meta', 'info.json') + with open(info_path, 'r') as f: + info = json.load(f) + + video_path_template = info['video_path'] + + # Identify video keys from the template + # Example: "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4" + video_keys = [] + for feature_name, feature_info in info['features'].items(): + if feature_info.get('dtype') == 'video': + # Use the full feature name as the video key + video_keys.append(feature_name) + + print(f"Found video keys: {video_keys}") + + # Copy videos for each episode + for old_folder, old_index, new_index in episode_mapping: + # Determine episode chunk (usually 0 for small datasets) + episode_chunk = old_index // info['chunks_size'] + new_episode_chunk = new_index // info['chunks_size'] + + for video_key in video_keys: + # Try different possible source paths + source_patterns = [ + # Standard path with the episode index from metadata + os.path.join( + old_folder, + video_path_template.format( + episode_chunk=episode_chunk, + video_key=video_key, + episode_index=old_index + ) + ), + # Try with 0-based indexing + os.path.join( + old_folder, + video_path_template.format( + episode_chunk=0, + video_key=video_key, + episode_index=0 + ) + ), + # Try with different formatting + os.path.join( + old_folder, + f"videos/chunk-{episode_chunk:03d}/{video_key}/episode_{old_index}.mp4" + ), + os.path.join( + old_folder, + f"videos/chunk-000/{video_key}/episode_000000.mp4" + ), + ] + + # Find the first existing source path + source_video_path = None + for pattern in source_patterns: + if os.path.exists(pattern): + source_video_path = pattern + break + + if source_video_path: + # Construct destination path + dest_video_path = os.path.join( + output_folder, + video_path_template.format( + episode_chunk=new_episode_chunk, + video_key=video_key, + episode_index=new_index + ) + ) + + # Create destination directory if it doesn't exist + os.makedirs(os.path.dirname(dest_video_path), exist_ok=True) + + print(f"Copying video: {source_video_path} -> {dest_video_path}") + shutil.copy2(source_video_path, dest_video_path) + else: + # If no file is found, search the directory recursively + found = False + for root, _, files in os.walk(os.path.join(old_folder, "videos")): + for file in files: + if file.endswith('.mp4') and video_key in root: + source_video_path = os.path.join(root, file) + + # Construct destination path + dest_video_path = os.path.join( + output_folder, + video_path_template.format( + episode_chunk=new_episode_chunk, + video_key=video_key, + episode_index=new_index + ) + ) + + # Create destination directory if it doesn't exist + os.makedirs(os.path.dirname(dest_video_path), exist_ok=True) + + print(f"Copying video (found by search): {source_video_path} -> {dest_video_path}") + shutil.copy2(source_video_path, dest_video_path) + found = True + break + if found: + break + + if not found: + print(f"Warning: Video file not found for {video_key}, episode {old_index} in {old_folder}") + +def validate_timestamps(source_folders, tolerance_s=1e-4): + """验证源数据集的时间戳结构,识别潜在问题""" + issues = [] + fps_values = [] + + for folder in source_folders: + try: + # 尝试从 info.json 获取 FPS + info_path = os.path.join(folder, 'meta', 'info.json') + if os.path.exists(info_path): + with open(info_path, 'r') as f: + info = json.load(f) + if 'fps' in info: + fps = info['fps'] + fps_values.append(fps) + print(f"数据集 {folder} FPS={fps}") + + # 检查是否有parquet文件包含时间戳 + parquet_path = None + for root, _, files in os.walk(os.path.join(folder, "parquet")): + for file in files: + if file.endswith('.parquet'): + parquet_path = os.path.join(root, file) + break + if parquet_path: + break + + if not parquet_path: + for root, _, files in os.walk(os.path.join(folder, "data")): + for file in files: + if file.endswith('.parquet'): + parquet_path = os.path.join(root, file) + break + if parquet_path: + break + + if parquet_path: + df = pd.read_parquet(parquet_path) + timestamp_cols = [col for col in df.columns if 'timestamp' in col or 'time' in col] + if timestamp_cols: + print(f"数据集 {folder} 包含时间戳列: {timestamp_cols}") + else: + issues.append(f"警告: 数据集 {folder} 没有时间戳列") + else: + issues.append(f"警告: 数据集 {folder} 未找到parquet文件") + + except Exception as e: + issues.append(f"错误: 验证数据集 {folder} 失败: {e}") + print(f"验证错误: {e}") + traceback.print_exc() + + # 检查FPS是否一致 + if len(set(fps_values)) > 1: + issues.append(f"警告: 数据集FPS不一致: {fps_values}") + + return issues, fps_values + +def normalize_timestamps_df(df, fps=None, base_time=None, episode_idx=0, dataset_idx=0, default_fps=20): + """ + 规范化DataFrame中的时间戳,强制创建连续时间序列,同时考虑数据集差异 + """ + normalized_df = df.copy() + + # 找出所有时间戳列 + timestamp_cols = [col for col in df.columns if 'timestamp' in col or 'time' in col] + + if not timestamp_cols: + return normalized_df # 没有时间戳列,直接返回 + + # 如果未提供fps,使用默认值 + if fps is None: + fps = default_fps + + # 确定基准时间 + if base_time is None: + # 使用当前时间作为基准,避免使用可能有问题的原始时间戳 + base_time = datetime.now().timestamp() + + # 为每个数据集添加一个较大的时间偏移(例如,每个数据集间隔1天) + dataset_offset = dataset_idx * 86400 # 86400秒 = 1天 + + # 为每个episode计算唯一的起始时间,避免重叠 + frame_interval = 1.0 / fps + episode_duration = len(df) * frame_interval + # 添加10帧的缓冲,确保episodes之间有间隔 + start_time = base_time + dataset_offset + episode_idx * (episode_duration + 10 * frame_interval) + + print(f"数据集 {dataset_idx}, Episode {episode_idx}: 创建连续时间戳,起始于 {start_time}, 间隔 {frame_interval}秒") + + # 对每个时间戳列创建严格递增的时间序列 + for col in timestamp_cols: + normalized_df[col] = [start_time + i * frame_interval for i in range(len(df))] + + return normalized_df + + +def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18, fps=None, + episode_to_frame_index=None, folder_task_mapping=None, chunks_size=1000, default_fps=20): + """复制parquet数据文件""" + # 获取第一个数据集的FPS(如果未提供) + if fps is None: + info_path = os.path.join(source_folders[0], 'meta', 'info.json') + if os.path.exists(info_path): + with open(info_path, 'r') as f: + info = json.load(f) + fps = info.get('fps', default_fps) # 使用变量替代硬编码的20 + else: + fps = default_fps # 使用变量替代硬编码的20 + + print(f"使用FPS={fps}") + + # 为每个episode复制和处理数据文件 + total_copied = 0 + total_failed = 0 + + # 添加一个列表来记录失败的文件及原因 + failed_files = [] + + for i, (old_folder, old_index, new_index) in enumerate(episode_mapping): + # 尝试找到源parquet文件 + episode_str = f"episode_{old_index:06d}.parquet" + source_paths = [ + os.path.join(old_folder, "parquet", episode_str), + os.path.join(old_folder, "data", episode_str) + ] + + source_path = None + for path in source_paths: + if os.path.exists(path): + source_path = path + break + + if source_path: + try: + # 读取parquet文件 + df = pd.read_parquet(source_path) + + # 注释掉时间戳规范化 + # df = normalize_timestamps_df(df, fps, base_time, i, dataset_idx) + + # 检查是否需要填充维度 + for feature in ['observation.state', 'action']: + if feature in df.columns: + # 检查第一个非空值 + for idx, value in enumerate(df[feature]): + if value is not None and isinstance(value, (list, np.ndarray)): + current_dim = len(value) + if current_dim < max_dim: + print(f"填充 {feature} 从 {current_dim} 维到 {max_dim} 维") + # 使用零填充到目标维度 + df[feature] = df[feature].apply( + lambda x: np.pad(x, (0, max_dim - len(x)), 'constant').tolist() + if x is not None and isinstance(x, (list, np.ndarray)) and len(x) < max_dim + else x + ) + break + + # 确保目标目录存在 + os.makedirs(os.path.join(output_folder, "parquet"), exist_ok=True) + + # 更新episode_index列 + if 'episode_index' in df.columns: + print(f"更新episode_index从 {df['episode_index'].iloc[0]} 到 {new_index}") + df['episode_index'] = new_index + + # 更新index列 + if 'index' in df.columns: + if episode_to_frame_index and new_index in episode_to_frame_index: + # 使用预先计算的帧索引起始值 + first_index = episode_to_frame_index[new_index] + print(f"更新index列,起始值: {first_index}(使用全局累积帧计数)") + else: + # 如果没有提供映射,使用当前的计算方式作为回退 + first_index = new_index * len(df) + print(f"更新index列,起始值: {first_index}(使用episode索引乘以长度)") + + # 更新所有帧的索引 + df['index'] = [first_index + i for i in range(len(df))] + + # 更新task_index列 + if 'task_index' in df.columns and folder_task_mapping and old_folder in folder_task_mapping: + # 获取当前task_index + current_task_index = df['task_index'].iloc[0] + + # 检查是否有对应的新索引 + if current_task_index in folder_task_mapping[old_folder]: + new_task_index = folder_task_mapping[old_folder][current_task_index] + print(f"更新task_index从 {current_task_index} 到 {new_task_index}") + df['task_index'] = new_task_index + else: + print(f"警告: 找不到task_index {current_task_index}的映射关系") + + # 计算chunk编号 + chunk_index = new_index // chunks_size + + # 创建正确的目标目录 + chunk_dir = os.path.join(output_folder, "data", f"chunk-{chunk_index:03d}") + os.makedirs(chunk_dir, exist_ok=True) + + # 构建正确的目标路径 + dest_path = os.path.join(chunk_dir, f"episode_{new_index:06d}.parquet") + + # 保存到正确位置 + df.to_parquet(dest_path, index=False) + + total_copied += 1 + print(f"已处理并保存: {dest_path}") + + except Exception as e: + error_msg = f"处理 {source_path} 失败: {e}" + print(error_msg) + traceback.print_exc() + failed_files.append({"file": source_path, "reason": str(e), "episode": old_index}) + total_failed += 1 + else: + # 文件不在标准位置,尝试递归搜索 + found = False + for root, _, files in os.walk(old_folder): + for file in files: + if file.endswith('.parquet') and f"episode_{old_index:06d}" in file: + try: + source_path = os.path.join(root, file) + + # 读取parquet文件 + df = pd.read_parquet(source_path) + + # 注释掉时间戳规范化 + # df = normalize_timestamps_df(df, fps, base_time, i, dataset_idx) + + # 确保目标目录存在 + os.makedirs(os.path.join(output_folder, "parquet"), exist_ok=True) + + # 更新episode_index列 + if 'episode_index' in df.columns: + print(f"更新episode_index从 {df['episode_index'].iloc[0]} 到 {new_index}") + df['episode_index'] = new_index + + # 更新index列 + if 'index' in df.columns: + if episode_to_frame_index and new_index in episode_to_frame_index: + # 使用预先计算的帧索引起始值 + first_index = episode_to_frame_index[new_index] + print(f"更新index列,起始值: {first_index}(使用全局累积帧计数)") + else: + # 如果没有提供映射,使用当前的计算方式作为回退 + first_index = new_index * len(df) + print(f"更新index列,起始值: {first_index}(使用episode索引乘以长度)") + + # 更新所有帧的索引 + df['index'] = [first_index + i for i in range(len(df))] + + # 更新task_index列 + if 'task_index' in df.columns and folder_task_mapping and old_folder in folder_task_mapping: + # 获取当前task_index + current_task_index = df['task_index'].iloc[0] + + # 检查是否有对应的新索引 + if current_task_index in folder_task_mapping[old_folder]: + new_task_index = folder_task_mapping[old_folder][current_task_index] + print(f"更新task_index从 {current_task_index} 到 {new_task_index}") + df['task_index'] = new_task_index + else: + print(f"警告: 找不到task_index {current_task_index}的映射关系") + + # 计算chunk编号 + chunk_index = new_index // chunks_size + + # 创建正确的目标目录 + chunk_dir = os.path.join(output_folder, "data", f"chunk-{chunk_index:03d}") + os.makedirs(chunk_dir, exist_ok=True) + + # 构建正确的目标路径 + dest_path = os.path.join(chunk_dir, f"episode_{new_index:06d}.parquet") + + # 保存到正确位置 + df.to_parquet(dest_path, index=False) + + total_copied += 1 + found = True + print(f"已处理并保存: {dest_path}") + break + except Exception as e: + error_msg = f"处理 {source_path} 失败: {e}" + print(error_msg) + traceback.print_exc() + failed_files.append({"file": source_path, "reason": str(e), "episode": old_index}) + total_failed += 1 + if found: + break + + if not found: + error_msg = f"找不到episode {old_index}的parquet文件,源文件夹: {old_folder}" + print(error_msg) + failed_files.append({"file": f"episode_{old_index:06d}.parquet", "reason": "文件未找到", "folder": old_folder}) + total_failed += 1 + + print(f"共复制 {total_copied} 个数据文件,{total_failed} 个失败") + + # 打印所有失败的文件详情 + if failed_files: + print("\n失败的文件详情:") + for i, failed in enumerate(failed_files): + print(f"{i+1}. 文件: {failed['file']}") + if 'folder' in failed: + print(f" 文件夹: {failed['folder']}") + if 'episode' in failed: + print(f" Episode索引: {failed['episode']}") + print(f" 原因: {failed['reason']}") + print("---") + + return total_copied > 0 + +def copy_data_files_bak(source_folders, output_folder, episode_mapping): + """Copy data files from source folders to output folder with updated episode indices.""" + # Get info.json to determine data structure + info_path = os.path.join(source_folders[0], 'meta', 'info.json') + with open(info_path, 'r') as f: + info = json.load(f) + + data_path_template = info['data_path'] + + # Process each episode mapping + for old_folder, old_index, new_index in episode_mapping: + # Calculate chunk indices + old_episode_chunk = old_index // info['chunks_size'] + new_episode_chunk = new_index // info['chunks_size'] + + # Construct source path + source_data_path = os.path.join( + old_folder, + data_path_template.format( + episode_chunk=old_episode_chunk, + episode_index=old_index + ) + ) + + # Construct destination path + dest_data_path = os.path.join( + output_folder, + data_path_template.format( + episode_chunk=new_episode_chunk, + episode_index=new_index + ) + ) + + # Create destination directory if it doesn't exist + os.makedirs(os.path.dirname(dest_data_path), exist_ok=True) + + # Check if source file exists + if os.path.exists(source_data_path): + print(f"Copying data: {source_data_path} -> {dest_data_path}") + + # Check if we need to pad dimensions + try: + # Read the source parquet file to check dimensions + df = pd.read_parquet(source_data_path) + + # Check if observation.state is a vector and needs padding + needs_padding = False + if 'observation.state' in df.columns: + first_state = df['observation.state'].iloc[0] + if isinstance(first_state, (list, np.ndarray)) and len(first_state) < 18: + needs_padding = True + + if needs_padding: + # This needs padding to 18-dim + pad_parquet_data(source_data_path, dest_data_path, len(first_state), 18) + else: + # Just copy the file + shutil.copy2(source_data_path, dest_data_path) + except Exception as e: + print(f"Error processing {source_data_path}: {e}") + # Fall back to simple copy + shutil.copy2(source_data_path, dest_data_path) + else: + print(f"Warning: {source_data_path} not found, searching for parquet files in {old_folder}") + # Try to find the file by searching + found = False + for root, dirs, files in os.walk(old_folder): + for file in files: + if file.endswith('.parquet'): + source_data_path = os.path.join(root, file) + print(f"Found parquet file: {source_data_path}") + + # Create a destination path based on the found file + rel_path = os.path.relpath(source_data_path, old_folder) + dest_data_path = os.path.join(output_folder, rel_path) + + # Ensure destination directory exists + os.makedirs(os.path.dirname(dest_data_path), exist_ok=True) + + # Check if we need to pad dimensions + try: + # Read the source parquet file to check dimensions + df = pd.read_parquet(source_data_path) + + # Check if observation.state is a vector and needs padding + needs_padding = False + if 'observation.state' in df.columns: + first_state = df['observation.state'].iloc[0] + if isinstance(first_state, (list, np.ndarray)) and len(first_state) < 18: + needs_padding = True + + if needs_padding: + # This needs padding to 18-dim + pad_parquet_data(source_data_path, dest_data_path, len(first_state), 18) + else: + # Just copy the file + shutil.copy2(source_data_path, dest_data_path) + + found = True + except Exception as e: + print(f"Error processing {source_data_path}: {e}") + # Fall back to simple copy + shutil.copy2(source_data_path, dest_data_path) + found = True + + if not found: + print(f"Warning: Could not find any parquet file in {old_folder}") + +def pad_parquet_data(source_path, target_path, original_dim=14, target_dim=18): + """ + 读取parquet文件,将数据从original_dim维填充到target_dim维, + 填充的值为0,然后保存到新的parquet文件。 + """ + # 读取parquet文件 + df = pd.read_parquet(source_path) + + # 打印列名以便调试 + print(f"Columns in {source_path}: {df.columns.tolist()}") + + # 创建新的DataFrame来存储填充后的数据 + new_df = df.copy() + + # 检查observation.state和action列是否存在 + if 'observation.state' in df.columns: + # 检查第一行数据,确认是否为向量 + first_state = df['observation.state'].iloc[0] + print(f"First observation.state type: {type(first_state)}, value: {first_state}") + + # 如果是向量(列表或numpy数组) + if isinstance(first_state, (list, np.ndarray)): + # 检查维度 + state_dim = len(first_state) + print(f"observation.state dimension: {state_dim}") + + if state_dim < target_dim: + # 填充向量 + print(f"Padding observation.state from {state_dim} to {target_dim} dimensions") + new_df['observation.state'] = df['observation.state'].apply( + lambda x: np.pad(x, (0, target_dim - len(x)), 'constant').tolist() + ) + + # 同样处理action列 + if 'action' in df.columns: + # 检查第一行数据 + first_action = df['action'].iloc[0] + print(f"First action type: {type(first_action)}, value: {first_action}") + + # 如果是向量 + if isinstance(first_action, (list, np.ndarray)): + # 检查维度 + action_dim = len(first_action) + print(f"action dimension: {action_dim}") + + if action_dim < target_dim: + # 填充向量 + print(f"Padding action from {action_dim} to {target_dim} dimensions") + new_df['action'] = df['action'].apply( + lambda x: np.pad(x, (0, target_dim - len(x)), 'constant').tolist() + ) + + # 确保目标目录存在 + os.makedirs(os.path.dirname(target_path), exist_ok=True) + + # 保存到新的parquet文件 + new_df.to_parquet(target_path, index=False) + + print(f"已将{source_path}处理并保存到{target_path}") + + return new_df + +def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s=1e-4, max_dim=18, default_fps=20): + """Merge multiple dataset folders into one.""" + # Create output folder if it doesn't exist + os.makedirs(output_folder, exist_ok=True) + os.makedirs(os.path.join(output_folder, 'meta'), exist_ok=True) + + # 注释掉时间戳验证 + # if validate_ts: + # issues, fps_values = validate_timestamps(source_folders, tolerance_s) + # if issues: + # print("时间戳验证发现以下问题:") + # for issue in issues: + # print(f" - {issue}") + # + # # 获取共同的FPS值 + # if fps_values: + # fps = max(set(fps_values), key=fps_values.count) + # print(f"使用共同FPS值: {fps}") + # else: + # fps = default_fps + # print(f"未找到FPS值,使用默认值: {default_fps}") + # else: + fps = default_fps + print(f"使用默认FPS值: {fps}") + + # Load episodes from all source folders + all_episodes = [] + all_episodes_stats = [] + all_tasks = [] + + total_frames = 0 + total_episodes = 0 + + # Keep track of episode mapping (old_folder, old_index, new_index) + episode_mapping = [] + + # Collect all stats for proper merging + all_stats_data = [] + + # Track dimensions for each folder + folder_dimensions = {} + + # 添加一个变量来跟踪累积的帧数 + cumulative_frame_count = 0 + + # 创建一个映射,用于存储每个新的episode索引对应的起始帧索引 + episode_to_frame_index = {} + + # 创建一个映射,用于跟踪旧的任务描述到新任务索引的映射 + task_desc_to_new_index = {} + # 创建一个映射,用于存储每个源文件夹和旧任务索引到新任务索引的映射 + folder_task_mapping = {} + + # 首先收集所有不同的任务描述 + all_unique_tasks = [] + + # 从info.json获取chunks_size + info_path = os.path.join(source_folders[0], 'meta', 'info.json') + chunks_size = 1000 # 默认值 + if os.path.exists(info_path): + with open(info_path, 'r') as f: + info = json.load(f) + chunks_size = info.get('chunks_size', 1000) + + # 使用更简单的方法计算视频总数 + total_videos = 0 + + for folder in source_folders: + try: + # 从每个数据集的info.json直接获取total_videos + folder_info_path = os.path.join(folder, 'meta', 'info.json') + if os.path.exists(folder_info_path): + with open(folder_info_path, 'r') as f: + folder_info = json.load(f) + if 'total_videos' in folder_info: + folder_videos = folder_info['total_videos'] + total_videos += folder_videos + print(f"从{folder}的info.json中读取到视频数量: {folder_videos}") + + # Check dimensions of state vectors in this folder + folder_dim = max_dim # 使用变量替代硬编码的18 + + # Try to find a parquet file to determine dimensions + for root, dirs, files in os.walk(folder): + for file in files: + if file.endswith('.parquet'): + try: + df = pd.read_parquet(os.path.join(root, file)) + if 'observation.state' in df.columns: + first_state = df['observation.state'].iloc[0] + if isinstance(first_state, (list, np.ndarray)): + folder_dim = len(first_state) + print(f"Detected {folder_dim} dimensions in {folder}") + break + except Exception as e: + print(f"Error checking dimensions in {folder}: {e}") + break + if folder_dim != max_dim: # 使用变量替代硬编码的18 + break + + folder_dimensions[folder] = folder_dim + + # Load episodes + episodes_path = os.path.join(folder, 'meta', 'episodes.jsonl') + if not os.path.exists(episodes_path): + print(f"Warning: Episodes file not found in {folder}, skipping") + continue + + episodes = load_jsonl(episodes_path) + + # Load episode stats + episodes_stats_path = os.path.join(folder, 'meta', 'episodes_stats.jsonl') + episodes_stats = [] + if os.path.exists(episodes_stats_path): + episodes_stats = load_jsonl(episodes_stats_path) + + # Create a mapping of episode_index to stats + stats_map = {} + for stat in episodes_stats: + if 'episode_index' in stat: + stats_map[stat['episode_index']] = stat + + # Load tasks + tasks_path = os.path.join(folder, 'meta', 'tasks.jsonl') + folder_tasks = [] + if os.path.exists(tasks_path): + folder_tasks = load_jsonl(tasks_path) + + # 创建此文件夹的任务映射 + folder_task_mapping[folder] = {} + + # 处理每个任务 + for task in folder_tasks: + task_desc = task['task'] + old_index = task['task_index'] + + # 检查任务描述是否已存在 + if task_desc not in task_desc_to_new_index: + # 添加新任务描述,分配新索引 + new_index = len(all_unique_tasks) + task_desc_to_new_index[task_desc] = new_index + all_unique_tasks.append({"task_index": new_index, "task": task_desc}) + + # 保存此文件夹中旧索引到新索引的映射 + folder_task_mapping[folder][old_index] = task_desc_to_new_index[task_desc] + + # Process all episodes from this folder + for episode in episodes: + old_index = episode['episode_index'] + new_index = total_episodes + + # Update episode index + episode['episode_index'] = new_index + all_episodes.append(episode) + + # Update stats if available + if old_index in stats_map: + stats = stats_map[old_index] + stats['episode_index'] = new_index + + # Pad stats data if needed + if 'stats' in stats and folder_dimensions[folder] < max_dim: # 使用变量替代硬编码的18 + # Pad observation.state and action stats + for feature in ['observation.state', 'action']: + if feature in stats['stats']: + for stat_type in ['mean', 'std', 'max', 'min']: + if stat_type in stats['stats'][feature]: + # Get current values + values = stats['stats'][feature][stat_type] + + # Check if it's a list/array that needs padding + if isinstance(values, list) and len(values) < max_dim: # 使用变量替代硬编码的18 + # Pad with zeros + padded = values + [0.0] * (max_dim - len(values)) # 使用变量替代硬编码的18 + stats['stats'][feature][stat_type] = padded + + all_episodes_stats.append(stats) + + # Add to all_stats_data for proper merging + if 'stats' in stats: + all_stats_data.append(stats['stats']) + + # Add to mapping + episode_mapping.append((folder, old_index, new_index)) + + # Update counters + total_episodes += 1 + total_frames += episode['length'] + + # 处理每个episode时收集此信息 + episode_to_frame_index[new_index] = cumulative_frame_count + cumulative_frame_count += episode['length'] + + # 使用收集的唯一任务列表替换之前的任务处理逻辑 + all_tasks = all_unique_tasks + + except Exception as e: + print(f"Error processing folder {folder}: {e}") + continue + + print(f"Processed {total_episodes} episodes from {len(source_folders)} folders") + + # Save combined episodes and stats + save_jsonl(all_episodes, os.path.join(output_folder, 'meta', 'episodes.jsonl')) + save_jsonl(all_episodes_stats, os.path.join(output_folder, 'meta', 'episodes_stats.jsonl')) + save_jsonl(all_tasks, os.path.join(output_folder, 'meta', 'tasks.jsonl')) + + # Merge and save stats + stats_list = [] + for folder in source_folders: + stats_path = os.path.join(folder, 'meta', 'stats.json') + if os.path.exists(stats_path): + with open(stats_path, 'r') as f: + stats = json.load(f) + stats_list.append(stats) + + if stats_list: + # Merge global stats + merged_stats = merge_stats(stats_list) + + # Update merged stats with episode-specific stats if available + if all_stats_data: + # For each feature in the stats + for feature in merged_stats.keys(): + if feature in all_stats_data[0]: + # Recalculate statistics based on all episodes + values = [stat[feature] for stat in all_stats_data if feature in stat] + + # Find the maximum dimension for this feature + max_dim = max(len(np.array(val.get('mean', [0])).flatten()) for val in values if 'mean' in val) + + # Update count + if 'count' in merged_stats[feature]: + merged_stats[feature]['count'] = [sum(stat.get('count', [0])[0] for stat in values if 'count' in stat)] + + # Update min/max with padding + if 'min' in merged_stats[feature] and all('min' in stat for stat in values): + # Pad min values + padded_mins = [] + for val in values: + val_array = np.array(val['min']) + val_flat = val_array.flatten() + if len(val_flat) < max_dim: + padded = np.zeros(max_dim) + padded[:len(val_flat)] = val_flat + padded_mins.append(padded) + else: + padded_mins.append(val_flat) + merged_stats[feature]['min'] = np.minimum.reduce(padded_mins).tolist() + + if 'max' in merged_stats[feature] and all('max' in stat for stat in values): + # Pad max values + padded_maxs = [] + for val in values: + val_array = np.array(val['max']) + val_flat = val_array.flatten() + if len(val_flat) < max_dim: + padded = np.zeros(max_dim) + padded[:len(val_flat)] = val_flat + padded_maxs.append(padded) + else: + padded_maxs.append(val_flat) + merged_stats[feature]['max'] = np.maximum.reduce(padded_maxs).tolist() + + # Update mean and std (weighted by count if available) + if 'mean' in merged_stats[feature] and all('mean' in stat for stat in values): + # Pad mean values + padded_means = [] + for val in values: + val_array = np.array(val['mean']) + val_flat = val_array.flatten() + if len(val_flat) < max_dim: + padded = np.zeros(max_dim) + padded[:len(val_flat)] = val_flat + padded_means.append(padded) + else: + padded_means.append(val_flat) + + if all('count' in stat for stat in values): + counts = [stat['count'][0] for stat in values] + total_count = sum(counts) + weighted_means = [mean * count / total_count for mean, count in zip(padded_means, counts)] + merged_stats[feature]['mean'] = np.sum(weighted_means, axis=0).tolist() + else: + merged_stats[feature]['mean'] = np.mean(padded_means, axis=0).tolist() + + if 'std' in merged_stats[feature] and all('std' in stat for stat in values): + # Pad std values + padded_stds = [] + for val in values: + val_array = np.array(val['std']) + val_flat = val_array.flatten() + if len(val_flat) < max_dim: + padded = np.zeros(max_dim) + padded[:len(val_flat)] = val_flat + padded_stds.append(padded) + else: + padded_stds.append(val_flat) + + if all('count' in stat for stat in values): + counts = [stat['count'][0] for stat in values] + total_count = sum(counts) + variances = [std**2 for std in padded_stds] + weighted_variances = [var * count / total_count for var, count in zip(variances, counts)] + merged_stats[feature]['std'] = np.sqrt(np.sum(weighted_variances, axis=0)).tolist() + else: + # Simple average of standard deviations + merged_stats[feature]['std'] = np.mean(padded_stds, axis=0).tolist() + + with open(os.path.join(output_folder, 'meta', 'stats.json'), 'w') as f: + json.dump(merged_stats, f, indent=4) + + # Update and save info.json + info_path = os.path.join(source_folders[0], 'meta', 'info.json') + with open(info_path, 'r') as f: + info = json.load(f) + + # Update info with correct counts + info['total_episodes'] = total_episodes + info['total_frames'] = total_frames + info['total_tasks'] = len(all_tasks) + info['total_chunks'] = (total_episodes + info['chunks_size'] - 1) // info['chunks_size'] # Ceiling division + + # Update splits + info['splits'] = {'train': f'0:{total_episodes}'} + + # Update feature dimensions to the maximum dimension + if 'features' in info: + # Find the maximum dimension across all folders + actual_max_dim = max_dim # 使用变量替代硬编码的18 + for folder, dim in folder_dimensions.items(): + actual_max_dim = max(actual_max_dim, dim) + + # Update observation.state and action dimensions + for feature_name in ['observation.state', 'action']: + if feature_name in info['features']: + # Update shape to the maximum dimension + if 'shape' in info['features'][feature_name]: + info['features'][feature_name]['shape'] = [actual_max_dim] + print(f"Updated {feature_name} shape to {actual_max_dim}") + + # 更新视频总数 + info['total_videos'] = total_videos + print(f"更新视频总数为: {total_videos}") + + with open(os.path.join(output_folder, 'meta', 'info.json'), 'w') as f: + json.dump(info, f, indent=4) + + # Copy video and data files + copy_videos(source_folders, output_folder, episode_mapping) + copy_data_files(source_folders, output_folder, episode_mapping, + max_dim=max_dim, fps=fps, + episode_to_frame_index=episode_to_frame_index, + folder_task_mapping=folder_task_mapping, + chunks_size=chunks_size) + + print(f"Merged {total_episodes} episodes with {total_frames} frames into {output_folder}") + +if __name__ == "__main__": + # Define source folders and output folder + source_folders = ["/data1/realman/put_plastic_to_box/", "/data1/realman/put_metal_to_box/","/data1/realman/Find_blue_box_pick_it_up_put_it_on_yellow_table"] + + # Add all 50 directories to the source_folders list + for i in range(1, 51): # 1 to 50 + folder_name = f"/data1/realman/325_test/Use your left hand to put the parts on the desktop into the box{i}" + source_folders.append(folder_name) + + # Print the number of source folders to verify + print(f"Number of source folders: {len(source_folders)}") + + output_folder = "/data1/realman/lerobot_merged_test/" + + # 设置默认FPS + default_fps = 20 + + # Merge the datasets + merge_datasets(source_folders, output_folder, max_dim=32, default_fps=default_fps) \ No newline at end of file From 78e988c33b9d264158e3d2142864bd9ce06c92d7 Mon Sep 17 00:00:00 2001 From: zhipeng tang <2444198418@qq.com> Date: Tue, 1 Apr 2025 15:14:11 +0800 Subject: [PATCH 2/9] Update merge.py Bilingual Documentation: Added comprehensive Chinese-English bilingual comments and function documentation to improve international usability. --- lerobot/scripts/merge.py | 437 +++++++++++++++++---------------------- 1 file changed, 191 insertions(+), 246 deletions(-) diff --git a/lerobot/scripts/merge.py b/lerobot/scripts/merge.py index c56fb4b3..81107bad 100644 --- a/lerobot/scripts/merge.py +++ b/lerobot/scripts/merge.py @@ -11,7 +11,16 @@ import logging def load_jsonl(file_path): - """Load data from a JSONL file.""" + """ + 从JSONL文件加载数据 + (Load data from a JSONL file) + + Args: + file_path (str): JSONL文件路径 (Path to the JSONL file) + + Returns: + list: 包含文件中每行JSON对象的列表 (List containing JSON objects from each line) + """ data = [] # Special handling for episodes_stats.jsonl @@ -56,13 +65,30 @@ def load_jsonl(file_path): return data def save_jsonl(data, file_path): - """Save data to a JSONL file.""" + """ + 将数据保存为JSONL格式 + (Save data in JSONL format) + + Args: + data (list): 要保存的JSON对象列表 (List of JSON objects to save) + file_path (str): 输出文件路径 (Path to the output file) + """ with open(file_path, 'w') as f: for item in data: f.write(json.dumps(item) + '\n') def merge_stats(stats_list): - """Merge statistics from multiple episodes.""" + """ + 合并多个数据集的统计信息,确保维度一致性 + (Merge statistics from multiple datasets, ensuring dimensional consistency) + + Args: + stats_list (list): 包含每个数据集统计信息的字典列表 + (List of dictionaries containing statistics for each dataset) + + Returns: + dict: 合并后的统计信息 (Merged statistics) + """ # Initialize merged stats with the structure of the first stats merged_stats = {} @@ -284,7 +310,16 @@ def merge_stats(stats_list): return merged_stats def copy_videos(source_folders, output_folder, episode_mapping): - """Copy video files from source folders to output folder with updated episode indices.""" + """ + 从源文件夹复制视频文件到输出文件夹,保持正确的索引和结构 + (Copy video files from source folders to output folder, maintaining correct indices and structure) + + Args: + source_folders (list): 源数据集文件夹路径列表 (List of source dataset folder paths) + output_folder (str): 输出文件夹路径 (Output folder path) + episode_mapping (list): 包含(旧文件夹,旧索引,新索引)元组的列表 + (List of tuples containing (old_folder, old_index, new_index)) + """ # Get info.json to determine video structure info_path = os.path.join(source_folders[0], 'meta', 'info.json') with open(info_path, 'r') as f: @@ -395,13 +430,24 @@ def copy_videos(source_folders, output_folder, episode_mapping): print(f"Warning: Video file not found for {video_key}, episode {old_index} in {old_folder}") def validate_timestamps(source_folders, tolerance_s=1e-4): - """验证源数据集的时间戳结构,识别潜在问题""" + """ + 验证源数据集的时间戳结构,识别潜在问题 + (Validate timestamp structure of source datasets, identify potential issues) + + Args: + source_folders (list): 源数据集文件夹路径列表 (List of source dataset folder paths) + tolerance_s (float): 时间戳不连续性的容差值,以秒为单位 (Tolerance for timestamp discontinuities in seconds) + + Returns: + tuple: (issues, fps_values) - 问题列表和检测到的FPS值列表 + (List of issues and list of detected FPS values) + """ issues = [] fps_values = [] for folder in source_folders: try: - # 尝试从 info.json 获取 FPS + # 尝试从 info.json 获取 FPS (Try to get FPS from info.json) info_path = os.path.join(folder, 'meta', 'info.json') if os.path.exists(info_path): with open(info_path, 'r') as f: @@ -409,9 +455,9 @@ def validate_timestamps(source_folders, tolerance_s=1e-4): if 'fps' in info: fps = info['fps'] fps_values.append(fps) - print(f"数据集 {folder} FPS={fps}") + print(f"数据集 {folder} FPS={fps} (Dataset {folder} FPS={fps})") - # 检查是否有parquet文件包含时间戳 + # 检查是否有parquet文件包含时间戳 (Check if any parquet files contain timestamps) parquet_path = None for root, _, files in os.walk(os.path.join(folder, "parquet")): for file in files: @@ -434,86 +480,68 @@ def validate_timestamps(source_folders, tolerance_s=1e-4): df = pd.read_parquet(parquet_path) timestamp_cols = [col for col in df.columns if 'timestamp' in col or 'time' in col] if timestamp_cols: - print(f"数据集 {folder} 包含时间戳列: {timestamp_cols}") + print(f"数据集 {folder} 包含时间戳列: {timestamp_cols} (Dataset {folder} contains timestamp columns: {timestamp_cols})") else: - issues.append(f"警告: 数据集 {folder} 没有时间戳列") + issues.append(f"警告: 数据集 {folder} 没有时间戳列 (Warning: Dataset {folder} has no timestamp columns)") else: - issues.append(f"警告: 数据集 {folder} 未找到parquet文件") + issues.append(f"警告: 数据集 {folder} 未找到parquet文件 (Warning: No parquet files found in dataset {folder})") except Exception as e: - issues.append(f"错误: 验证数据集 {folder} 失败: {e}") - print(f"验证错误: {e}") + issues.append(f"错误: 验证数据集 {folder} 失败: {e} (Error: Failed to validate dataset {folder}: {e})") + print(f"验证错误: {e} (Validation error: {e})") traceback.print_exc() - # 检查FPS是否一致 + # 检查FPS是否一致 (Check if FPS values are consistent) if len(set(fps_values)) > 1: - issues.append(f"警告: 数据集FPS不一致: {fps_values}") + issues.append(f"警告: 数据集FPS不一致: {fps_values} (Warning: Inconsistent FPS across datasets: {fps_values})") return issues, fps_values -def normalize_timestamps_df(df, fps=None, base_time=None, episode_idx=0, dataset_idx=0, default_fps=20): - """ - 规范化DataFrame中的时间戳,强制创建连续时间序列,同时考虑数据集差异 - """ - normalized_df = df.copy() - - # 找出所有时间戳列 - timestamp_cols = [col for col in df.columns if 'timestamp' in col or 'time' in col] - - if not timestamp_cols: - return normalized_df # 没有时间戳列,直接返回 - - # 如果未提供fps,使用默认值 - if fps is None: - fps = default_fps - - # 确定基准时间 - if base_time is None: - # 使用当前时间作为基准,避免使用可能有问题的原始时间戳 - base_time = datetime.now().timestamp() - - # 为每个数据集添加一个较大的时间偏移(例如,每个数据集间隔1天) - dataset_offset = dataset_idx * 86400 # 86400秒 = 1天 - - # 为每个episode计算唯一的起始时间,避免重叠 - frame_interval = 1.0 / fps - episode_duration = len(df) * frame_interval - # 添加10帧的缓冲,确保episodes之间有间隔 - start_time = base_time + dataset_offset + episode_idx * (episode_duration + 10 * frame_interval) - - print(f"数据集 {dataset_idx}, Episode {episode_idx}: 创建连续时间戳,起始于 {start_time}, 间隔 {frame_interval}秒") - - # 对每个时间戳列创建严格递增的时间序列 - for col in timestamp_cols: - normalized_df[col] = [start_time + i * frame_interval for i in range(len(df))] - - return normalized_df - - def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18, fps=None, episode_to_frame_index=None, folder_task_mapping=None, chunks_size=1000, default_fps=20): - """复制parquet数据文件""" - # 获取第一个数据集的FPS(如果未提供) + """ + 复制并处理parquet数据文件,包括维度填充和索引更新 + (Copy and process parquet data files, including dimension padding and index updates) + + Args: + source_folders (list): 源数据集文件夹路径列表 (List of source dataset folder paths) + output_folder (str): 输出文件夹路径 (Output folder path) + episode_mapping (list): 包含(旧文件夹,旧索引,新索引)元组的列表 + (List of tuples containing (old_folder, old_index, new_index)) + max_dim (int): 向量的最大维度 (Maximum dimension for vectors) + fps (float, optional): 帧率,如果未提供则从第一个数据集获取 (Frame rate, if not provided will be obtained from the first dataset) + episode_to_frame_index (dict, optional): 每个新episode索引对应的起始帧索引映射 + (Mapping of each new episode index to its starting frame index) + folder_task_mapping (dict, optional): 每个文件夹中task_index的映射关系 + (Mapping of task_index for each folder) + chunks_size (int): 每个chunk包含的episode数量 (Number of episodes per chunk) + default_fps (float): 默认帧率,当无法从数据集获取时使用 (Default frame rate when unable to obtain from dataset) + + Returns: + bool: 是否成功复制了至少一个文件 (Whether at least one file was successfully copied) + """ + # 获取第一个数据集的FPS(如果未提供)(Get FPS from first dataset if not provided) if fps is None: info_path = os.path.join(source_folders[0], 'meta', 'info.json') if os.path.exists(info_path): with open(info_path, 'r') as f: info = json.load(f) - fps = info.get('fps', default_fps) # 使用变量替代硬编码的20 + fps = info.get('fps', default_fps) # 使用变量替代硬编码的20 (Use variable instead of hardcoded 20) else: - fps = default_fps # 使用变量替代硬编码的20 + fps = default_fps # 使用变量替代硬编码的20 (Use variable instead of hardcoded 20) - print(f"使用FPS={fps}") + print(f"使用FPS={fps} (Using FPS={fps})") - # 为每个episode复制和处理数据文件 + # 为每个episode复制和处理数据文件 (Copy and process data files for each episode) total_copied = 0 total_failed = 0 - # 添加一个列表来记录失败的文件及原因 + # 添加一个列表来记录失败的文件及原因 + # (Add a list to record failed files and reasons) failed_files = [] for i, (old_folder, old_index, new_index) in enumerate(episode_mapping): - # 尝试找到源parquet文件 + # 尝试找到源parquet文件 (Try to find source parquet file) episode_str = f"episode_{old_index:06d}.parquet" source_paths = [ os.path.join(old_folder, "parquet", episode_str), @@ -528,22 +556,19 @@ def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18, if source_path: try: - # 读取parquet文件 + # 读取parquet文件 (Read parquet file) df = pd.read_parquet(source_path) - # 注释掉时间戳规范化 - # df = normalize_timestamps_df(df, fps, base_time, i, dataset_idx) - - # 检查是否需要填充维度 + # 检查是否需要填充维度 (Check if dimensions need padding) for feature in ['observation.state', 'action']: if feature in df.columns: - # 检查第一个非空值 + # 检查第一个非空值 (Check first non-null value) for idx, value in enumerate(df[feature]): if value is not None and isinstance(value, (list, np.ndarray)): current_dim = len(value) if current_dim < max_dim: - print(f"填充 {feature} 从 {current_dim} 维到 {max_dim} 维") - # 使用零填充到目标维度 + print(f"填充 {feature} 从 {current_dim} 维到 {max_dim} 维 (Padding {feature} from {current_dim} to {max_dim} dimensions)") + # 使用零填充到目标维度 (Pad with zeros to target dimension) df[feature] = df[feature].apply( lambda x: np.pad(x, (0, max_dim - len(x)), 'constant').tolist() if x is not None and isinstance(x, (list, np.ndarray)) and len(x) < max_dim @@ -551,59 +576,57 @@ def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18, ) break - # 确保目标目录存在 - os.makedirs(os.path.join(output_folder, "parquet"), exist_ok=True) - - # 更新episode_index列 + # 更新episode_index列 (Update episode_index column) if 'episode_index' in df.columns: - print(f"更新episode_index从 {df['episode_index'].iloc[0]} 到 {new_index}") + print(f"更新episode_index从 {df['episode_index'].iloc[0]} 到 {new_index} (Update episode_index from {df['episode_index'].iloc[0]} to {new_index})") df['episode_index'] = new_index - # 更新index列 + # 更新index列 (Update index column) if 'index' in df.columns: if episode_to_frame_index and new_index in episode_to_frame_index: - # 使用预先计算的帧索引起始值 + # 使用预先计算的帧索引起始值 (Use pre-calculated frame index start value) first_index = episode_to_frame_index[new_index] - print(f"更新index列,起始值: {first_index}(使用全局累积帧计数)") + print(f"更新index列,起始值: {first_index}(使用全局累积帧计数)(Update index column, start value: {first_index} (using global cumulative frame count))") else: # 如果没有提供映射,使用当前的计算方式作为回退 + # (If no mapping provided, use current calculation as fallback) first_index = new_index * len(df) - print(f"更新index列,起始值: {first_index}(使用episode索引乘以长度)") + print(f"更新index列,起始值: {first_index}(使用episode索引乘以长度)(Update index column, start value: {first_index} (using episode index multiplied by length))") - # 更新所有帧的索引 + # 更新所有帧的索引 (Update indices for all frames) df['index'] = [first_index + i for i in range(len(df))] - # 更新task_index列 + # 更新task_index列 (Update task_index column) if 'task_index' in df.columns and folder_task_mapping and old_folder in folder_task_mapping: - # 获取当前task_index + # 获取当前task_index (Get current task_index) current_task_index = df['task_index'].iloc[0] - # 检查是否有对应的新索引 + # 检查是否有对应的新索引 (Check if there's a corresponding new index) if current_task_index in folder_task_mapping[old_folder]: new_task_index = folder_task_mapping[old_folder][current_task_index] - print(f"更新task_index从 {current_task_index} 到 {new_task_index}") + print(f"更新task_index从 {current_task_index} 到 {new_task_index} (Update task_index from {current_task_index} to {new_task_index})") df['task_index'] = new_task_index else: - print(f"警告: 找不到task_index {current_task_index}的映射关系") + print(f"警告: 找不到task_index {current_task_index}的映射关系 (Warning: No mapping found for task_index {current_task_index})") - # 计算chunk编号 + # 计算chunk编号 (Calculate chunk number) chunk_index = new_index // chunks_size - # 创建正确的目标目录 + # 创建正确的目标目录 (Create correct target directory) chunk_dir = os.path.join(output_folder, "data", f"chunk-{chunk_index:03d}") os.makedirs(chunk_dir, exist_ok=True) - # 构建正确的目标路径 + # 构建正确的目标路径 (Build correct target path) dest_path = os.path.join(chunk_dir, f"episode_{new_index:06d}.parquet") - # 保存到正确位置 + # 保存到正确位置 (Save to correct location) df.to_parquet(dest_path, index=False) total_copied += 1 - print(f"已处理并保存: {dest_path}") + print(f"已处理并保存: {dest_path} (Processed and saved: {dest_path})") except Exception as e: - error_msg = f"处理 {source_path} 失败: {e}" + error_msg = f"处理 {source_path} 失败: {e} (Processing {source_path} failed: {e})" print(error_msg) traceback.print_exc() failed_files.append({"file": source_path, "reason": str(e), "episode": old_index}) @@ -617,66 +640,78 @@ def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18, try: source_path = os.path.join(root, file) - # 读取parquet文件 + # 读取parquet文件 (Read parquet file) df = pd.read_parquet(source_path) - # 注释掉时间戳规范化 - # df = normalize_timestamps_df(df, fps, base_time, i, dataset_idx) + # 检查是否需要填充维度 (Check if dimensions need padding) + for feature in ['observation.state', 'action']: + if feature in df.columns: + # 检查第一个非空值 (Check first non-null value) + for idx, value in enumerate(df[feature]): + if value is not None and isinstance(value, (list, np.ndarray)): + current_dim = len(value) + if current_dim < max_dim: + print(f"填充 {feature} 从 {current_dim} 维到 {max_dim} 维 (Padding {feature} from {current_dim} to {max_dim} dimensions)") + # 使用零填充到目标维度 (Pad with zeros to target dimension) + df[feature] = df[feature].apply( + lambda x: np.pad(x, (0, max_dim - len(x)), 'constant').tolist() + if x is not None and isinstance(x, (list, np.ndarray)) and len(x) < max_dim + else x + ) + break - # 确保目标目录存在 - os.makedirs(os.path.join(output_folder, "parquet"), exist_ok=True) - - # 更新episode_index列 + # 更新episode_index列 (Update episode_index column) if 'episode_index' in df.columns: - print(f"更新episode_index从 {df['episode_index'].iloc[0]} 到 {new_index}") + print(f"更新episode_index从 {df['episode_index'].iloc[0]} 到 {new_index} (Update episode_index from {df['episode_index'].iloc[0]} to {new_index})") df['episode_index'] = new_index - # 更新index列 + # 更新index列 (Update index column) if 'index' in df.columns: if episode_to_frame_index and new_index in episode_to_frame_index: - # 使用预先计算的帧索引起始值 + # 使用预先计算的帧索引起始值 (Use pre-calculated frame index start value) first_index = episode_to_frame_index[new_index] - print(f"更新index列,起始值: {first_index}(使用全局累积帧计数)") + print(f"更新index列,起始值: {first_index}(使用全局累积帧计数)(Update index column, start value: {first_index} (using global cumulative frame count))") else: # 如果没有提供映射,使用当前的计算方式作为回退 + # (If no mapping provided, use current calculation as fallback) first_index = new_index * len(df) - print(f"更新index列,起始值: {first_index}(使用episode索引乘以长度)") + print(f"更新index列,起始值: {first_index}(使用episode索引乘以长度)(Update index column, start value: {first_index} (using episode index multiplied by length))") - # 更新所有帧的索引 + # 更新所有帧的索引 (Update indices for all frames) df['index'] = [first_index + i for i in range(len(df))] - # 更新task_index列 + # 更新task_index列 (Update task_index column) if 'task_index' in df.columns and folder_task_mapping and old_folder in folder_task_mapping: - # 获取当前task_index + # 获取当前task_index (Get current task_index) current_task_index = df['task_index'].iloc[0] - # 检查是否有对应的新索引 + # 检查是否有对应的新索引 (Check if there's a corresponding new index) if current_task_index in folder_task_mapping[old_folder]: new_task_index = folder_task_mapping[old_folder][current_task_index] - print(f"更新task_index从 {current_task_index} 到 {new_task_index}") + print(f"更新task_index从 {current_task_index} 到 {new_task_index} (Update task_index from {current_task_index} to {new_task_index})") df['task_index'] = new_task_index else: - print(f"警告: 找不到task_index {current_task_index}的映射关系") + print(f"警告: 找不到task_index {current_task_index}的映射关系 (Warning: No mapping found for task_index {current_task_index})") - # 计算chunk编号 + # 计算chunk编号 (Calculate chunk number) chunk_index = new_index // chunks_size - # 创建正确的目标目录 + # 创建正确的目标目录 (Create correct target directory) chunk_dir = os.path.join(output_folder, "data", f"chunk-{chunk_index:03d}") os.makedirs(chunk_dir, exist_ok=True) - # 构建正确的目标路径 + # 构建正确的目标路径 (Build correct target path) dest_path = os.path.join(chunk_dir, f"episode_{new_index:06d}.parquet") - # 保存到正确位置 + # 保存到正确位置 (Save to correct location) df.to_parquet(dest_path, index=False) total_copied += 1 found = True - print(f"已处理并保存: {dest_path}") + print(f"已处理并保存: {dest_path} (Processed and saved: {dest_path})") break except Exception as e: - error_msg = f"处理 {source_path} 失败: {e}" + error_msg = f"处理 {source_path} 失败: {e} (Processing {source_path} failed: {e})" print(error_msg) traceback.print_exc() failed_files.append({"file": source_path, "reason": str(e), "episode": old_index}) @@ -692,132 +727,30 @@ def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18, print(f"共复制 {total_copied} 个数据文件,{total_failed} 个失败") - # 打印所有失败的文件详情 + # 打印所有失败的文件详情 (Print details of all failed files) if failed_files: - print("\n失败的文件详情:") + print("\n失败的文件详情 (Details of failed files):") for i, failed in enumerate(failed_files): - print(f"{i+1}. 文件: {failed['file']}") + print(f"{i+1}. 文件 (File): {failed['file']}") if 'folder' in failed: - print(f" 文件夹: {failed['folder']}") + print(f" 文件夹 (Folder): {failed['folder']}") if 'episode' in failed: - print(f" Episode索引: {failed['episode']}") - print(f" 原因: {failed['reason']}") + print(f" Episode索引 (Episode index): {failed['episode']}") + print(f" 原因 (Reason): {failed['reason']}") print("---") return total_copied > 0 -def copy_data_files_bak(source_folders, output_folder, episode_mapping): - """Copy data files from source folders to output folder with updated episode indices.""" - # Get info.json to determine data structure - info_path = os.path.join(source_folders[0], 'meta', 'info.json') - with open(info_path, 'r') as f: - info = json.load(f) - - data_path_template = info['data_path'] - - # Process each episode mapping - for old_folder, old_index, new_index in episode_mapping: - # Calculate chunk indices - old_episode_chunk = old_index // info['chunks_size'] - new_episode_chunk = new_index // info['chunks_size'] - - # Construct source path - source_data_path = os.path.join( - old_folder, - data_path_template.format( - episode_chunk=old_episode_chunk, - episode_index=old_index - ) - ) - - # Construct destination path - dest_data_path = os.path.join( - output_folder, - data_path_template.format( - episode_chunk=new_episode_chunk, - episode_index=new_index - ) - ) - - # Create destination directory if it doesn't exist - os.makedirs(os.path.dirname(dest_data_path), exist_ok=True) - - # Check if source file exists - if os.path.exists(source_data_path): - print(f"Copying data: {source_data_path} -> {dest_data_path}") - - # Check if we need to pad dimensions - try: - # Read the source parquet file to check dimensions - df = pd.read_parquet(source_data_path) - - # Check if observation.state is a vector and needs padding - needs_padding = False - if 'observation.state' in df.columns: - first_state = df['observation.state'].iloc[0] - if isinstance(first_state, (list, np.ndarray)) and len(first_state) < 18: - needs_padding = True - - if needs_padding: - # This needs padding to 18-dim - pad_parquet_data(source_data_path, dest_data_path, len(first_state), 18) - else: - # Just copy the file - shutil.copy2(source_data_path, dest_data_path) - except Exception as e: - print(f"Error processing {source_data_path}: {e}") - # Fall back to simple copy - shutil.copy2(source_data_path, dest_data_path) - else: - print(f"Warning: {source_data_path} not found, searching for parquet files in {old_folder}") - # Try to find the file by searching - found = False - for root, dirs, files in os.walk(old_folder): - for file in files: - if file.endswith('.parquet'): - source_data_path = os.path.join(root, file) - print(f"Found parquet file: {source_data_path}") - - # Create a destination path based on the found file - rel_path = os.path.relpath(source_data_path, old_folder) - dest_data_path = os.path.join(output_folder, rel_path) - - # Ensure destination directory exists - os.makedirs(os.path.dirname(dest_data_path), exist_ok=True) - - # Check if we need to pad dimensions - try: - # Read the source parquet file to check dimensions - df = pd.read_parquet(source_data_path) - - # Check if observation.state is a vector and needs padding - needs_padding = False - if 'observation.state' in df.columns: - first_state = df['observation.state'].iloc[0] - if isinstance(first_state, (list, np.ndarray)) and len(first_state) < 18: - needs_padding = True - - if needs_padding: - # This needs padding to 18-dim - pad_parquet_data(source_data_path, dest_data_path, len(first_state), 18) - else: - # Just copy the file - shutil.copy2(source_data_path, dest_data_path) - - found = True - except Exception as e: - print(f"Error processing {source_data_path}: {e}") - # Fall back to simple copy - shutil.copy2(source_data_path, dest_data_path) - found = True - - if not found: - print(f"Warning: Could not find any parquet file in {old_folder}") - def pad_parquet_data(source_path, target_path, original_dim=14, target_dim=18): """ - 读取parquet文件,将数据从original_dim维填充到target_dim维, - 填充的值为0,然后保存到新的parquet文件。 + 通过零填充将parquet数据从原始维度扩展到目标维度 + (Extend parquet data from original dimension to target dimension by zero-padding) + + Args: + source_path (str): 源parquet文件路径 (Source parquet file path) + target_path (str): 目标parquet文件路径 (Target parquet file path) + original_dim (int): 原始向量维度 (Original vector dimension) + target_dim (int): 目标向量维度 (Target vector dimension) """ # 读取parquet文件 df = pd.read_parquet(source_path) @@ -877,7 +810,26 @@ def pad_parquet_data(source_path, target_path, original_dim=14, target_dim=18): return new_df def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s=1e-4, max_dim=18, default_fps=20): - """Merge multiple dataset folders into one.""" + """ + 将多个数据集文件夹合并为一个,处理索引、维度和元数据 + (Merge multiple dataset folders into one, handling indices, dimensions, and metadata) + + Args: + source_folders (list): 源数据集文件夹路径列表 (List of source dataset folder paths) + output_folder (str): 输出文件夹路径 (Output folder path) + validate_ts (bool): 是否验证时间戳 (Whether to validate timestamps) + tolerance_s (float): 时间戳不连续性的容差值,以秒为单位 (Tolerance for timestamp discontinuities in seconds) + max_dim (int): 向量的最大维度 (Maximum dimension for vectors) + default_fps (float): 默认帧率 (Default frame rate) + + 这个函数执行以下操作: + (This function performs the following operations:) + 1. 合并所有的episodes、tasks和stats (Merges all episodes, tasks and stats) + 2. 重新编号所有的索引以保持连续性 (Renumbers all indices to maintain continuity) + 3. 填充向量维度使其一致 (Pads vector dimensions for consistency) + 4. 更新元数据文件 (Updates metadata files) + 5. 复制并处理数据和视频文件 (Copies and processes data and video files) + """ # Create output folder if it doesn't exist os.makedirs(output_folder, exist_ok=True) os.makedirs(os.path.join(output_folder, 'meta'), exist_ok=True) @@ -940,12 +892,13 @@ def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s info = json.load(f) chunks_size = info.get('chunks_size', 1000) - # 使用更简单的方法计算视频总数 + # 使用更简单的方法计算视频总数 (Use simpler method to calculate total videos) total_videos = 0 for folder in source_folders: try: # 从每个数据集的info.json直接获取total_videos + # (Get total_videos directly from each dataset's info.json) folder_info_path = os.path.join(folder, 'meta', 'info.json') if os.path.exists(folder_info_path): with open(folder_info_path, 'r') as f: @@ -953,7 +906,7 @@ def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s if 'total_videos' in folder_info: folder_videos = folder_info['total_videos'] total_videos += folder_videos - print(f"从{folder}的info.json中读取到视频数量: {folder_videos}") + print(f"从{folder}的info.json中读取到视频数量: {folder_videos} (Read video count from {folder}'s info.json: {folder_videos})") # Check dimensions of state vectors in this folder folder_dim = max_dim # 使用变量替代硬编码的18 @@ -1217,9 +1170,9 @@ def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s info['features'][feature_name]['shape'] = [actual_max_dim] print(f"Updated {feature_name} shape to {actual_max_dim}") - # 更新视频总数 + # 更新视频总数 (Update total videos) info['total_videos'] = total_videos - print(f"更新视频总数为: {total_videos}") + print(f"更新视频总数为: {total_videos} (Update total videos to: {total_videos})") with open(os.path.join(output_folder, 'meta', 'info.json'), 'w') as f: json.dump(info, f, indent=4) @@ -1236,15 +1189,7 @@ def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s if __name__ == "__main__": # Define source folders and output folder - source_folders = ["/data1/realman/put_plastic_to_box/", "/data1/realman/put_metal_to_box/","/data1/realman/Find_blue_box_pick_it_up_put_it_on_yellow_table"] - - # Add all 50 directories to the source_folders list - for i in range(1, 51): # 1 to 50 - folder_name = f"/data1/realman/325_test/Use your left hand to put the parts on the desktop into the box{i}" - source_folders.append(folder_name) - - # Print the number of source folders to verify - print(f"Number of source folders: {len(source_folders)}") + source_folders = ["/path/to/put_plastic_to_box/", "/path/to/put_metal_to_box/","/path/to/Find_blue_box"] output_folder = "/data1/realman/lerobot_merged_test/" @@ -1252,4 +1197,4 @@ if __name__ == "__main__": default_fps = 20 # Merge the datasets - merge_datasets(source_folders, output_folder, max_dim=32, default_fps=default_fps) \ No newline at end of file + merge_datasets(source_folders, output_folder, max_dim=32, default_fps=default_fps) From e750f7379e6102ff8efaff4b04dbbdfb053607a9 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 1 Apr 2025 07:22:58 +0000 Subject: [PATCH 3/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- lerobot/scripts/merge.py | 951 ++++++++++++++++++++++----------------- 1 file changed, 530 insertions(+), 421 deletions(-) diff --git a/lerobot/scripts/merge.py b/lerobot/scripts/merge.py index 81107bad..6763ecf8 100644 --- a/lerobot/scripts/merge.py +++ b/lerobot/scripts/merge.py @@ -1,49 +1,46 @@ import json import os import shutil -from pathlib import Path +import traceback + import numpy as np import pandas as pd -import pandas as pd -from datetime import datetime -import traceback -import logging def load_jsonl(file_path): """ 从JSONL文件加载数据 (Load data from a JSONL file) - + Args: file_path (str): JSONL文件路径 (Path to the JSONL file) - + Returns: list: 包含文件中每行JSON对象的列表 (List containing JSON objects from each line) """ data = [] - + # Special handling for episodes_stats.jsonl - if 'episodes_stats.jsonl' in file_path: + if "episodes_stats.jsonl" in file_path: try: # Try to load the entire file as a JSON array - with open(file_path, 'r') as f: + with open(file_path) as f: content = f.read() # Check if the content starts with '[' and ends with ']' - if content.strip().startswith('[') and content.strip().endswith(']'): + if content.strip().startswith("[") and content.strip().endswith("]"): return json.loads(content) else: # Try to add brackets and parse try: - return json.loads('[' + content + ']') + return json.loads("[" + content + "]") except json.JSONDecodeError: pass except Exception as e: print(f"Error loading {file_path} as JSON array: {e}") - + # Fall back to line-by-line parsing try: - with open(file_path, 'r') as f: + with open(file_path) as f: for line in f: if line.strip(): try: @@ -54,158 +51,180 @@ def load_jsonl(file_path): print(f"Error loading {file_path} line by line: {e}") else: # Standard JSONL parsing for other files - with open(file_path, 'r') as f: + with open(file_path) as f: for line in f: if line.strip(): try: data.append(json.loads(line)) except json.JSONDecodeError: print(f"Warning: Could not parse line in {file_path}: {line[:100]}...") - + return data + def save_jsonl(data, file_path): """ 将数据保存为JSONL格式 (Save data in JSONL format) - + Args: data (list): 要保存的JSON对象列表 (List of JSON objects to save) file_path (str): 输出文件路径 (Path to the output file) """ - with open(file_path, 'w') as f: + with open(file_path, "w") as f: for item in data: - f.write(json.dumps(item) + '\n') + f.write(json.dumps(item) + "\n") + def merge_stats(stats_list): """ 合并多个数据集的统计信息,确保维度一致性 (Merge statistics from multiple datasets, ensuring dimensional consistency) - + Args: stats_list (list): 包含每个数据集统计信息的字典列表 (List of dictionaries containing statistics for each dataset) - + Returns: dict: 合并后的统计信息 (Merged statistics) """ # Initialize merged stats with the structure of the first stats merged_stats = {} - + # Find common features across all stats common_features = set(stats_list[0].keys()) for stats in stats_list[1:]: common_features = common_features.intersection(set(stats.keys())) - + # Process features in the order they appear in the first stats file for feature in stats_list[0].keys(): if feature not in common_features: continue - + merged_stats[feature] = {} - + # Find common stat types for this feature common_stat_types = [] - for stat_type in ['mean', 'std', 'max', 'min']: + for stat_type in ["mean", "std", "max", "min"]: if all(stat_type in stats[feature] for stats in stats_list): common_stat_types.append(stat_type) - + # Determine the original shape of each value original_shapes = [] for stats in stats_list: - if 'mean' in stats[feature]: - shape = np.array(stats[feature]['mean']).shape + if "mean" in stats[feature]: + shape = np.array(stats[feature]["mean"]).shape original_shapes.append(shape) - + # Special handling for image features to preserve nested structure - if feature.startswith('observation.images.'): + if feature.startswith("observation.images."): for stat_type in common_stat_types: try: # Get all values values = [stats[feature][stat_type] for stats in stats_list] - + # For image features, we need to preserve the nested structure # Initialize with the first value's structure result = [] - + # For RGB channels for channel_idx in range(len(values[0])): channel_result = [] - + # For each pixel row for pixel_idx in range(len(values[0][channel_idx])): pixel_result = [] - + # For each pixel value for value_idx in range(len(values[0][channel_idx][pixel_idx])): # Calculate statistic based on type - if stat_type == 'mean': + if stat_type == "mean": # Simple average - avg = sum(values[i][channel_idx][pixel_idx][value_idx] for i in range(len(values))) / len(values) + avg = sum( + values[i][channel_idx][pixel_idx][value_idx] + for i in range(len(values)) + ) / len(values) pixel_result.append(avg) - elif stat_type == 'std': + elif stat_type == "std": # Simple average of std - avg = sum(values[i][channel_idx][pixel_idx][value_idx] for i in range(len(values))) / len(values) + avg = sum( + values[i][channel_idx][pixel_idx][value_idx] + for i in range(len(values)) + ) / len(values) pixel_result.append(avg) - elif stat_type == 'max': + elif stat_type == "max": # Maximum - max_val = max(values[i][channel_idx][pixel_idx][value_idx] for i in range(len(values))) + max_val = max( + values[i][channel_idx][pixel_idx][value_idx] + for i in range(len(values)) + ) pixel_result.append(max_val) - elif stat_type == 'min': + elif stat_type == "min": # Minimum - min_val = min(values[i][channel_idx][pixel_idx][value_idx] for i in range(len(values))) + min_val = min( + values[i][channel_idx][pixel_idx][value_idx] + for i in range(len(values)) + ) pixel_result.append(min_val) - + channel_result.append(pixel_result) - + result.append(channel_result) - + merged_stats[feature][stat_type] = result except Exception as e: print(f"Warning: Error processing image feature {feature}.{stat_type}: {e}") # Fallback to first value merged_stats[feature][stat_type] = values[0] # If all shapes are the same, no need for special handling - elif len(set(str(shape) for shape in original_shapes)) == 1: + elif len({str(shape) for shape in original_shapes}) == 1: # All shapes are the same, use standard merging for stat_type in common_stat_types: values = [stats[feature][stat_type] for stats in stats_list] - + try: # Calculate the new statistic based on the type - if stat_type == 'mean': - if all('count' in stats[feature] for stats in stats_list): - counts = [stats[feature]['count'][0] for stats in stats_list] + if stat_type == "mean": + if all("count" in stats[feature] for stats in stats_list): + counts = [stats[feature]["count"][0] for stats in stats_list] total_count = sum(counts) - weighted_values = [np.array(val) * count / total_count for val, count in zip(values, counts)] + weighted_values = [ + np.array(val) * count / total_count + for val, count in zip(values, counts, strict=False) + ] merged_stats[feature][stat_type] = np.sum(weighted_values, axis=0).tolist() else: merged_stats[feature][stat_type] = np.mean(np.array(values), axis=0).tolist() - - elif stat_type == 'std': - if all('count' in stats[feature] for stats in stats_list): - counts = [stats[feature]['count'][0] for stats in stats_list] + + elif stat_type == "std": + if all("count" in stats[feature] for stats in stats_list): + counts = [stats[feature]["count"][0] for stats in stats_list] total_count = sum(counts) - variances = [np.array(std)**2 for std in values] - weighted_variances = [var * count / total_count for var, count in zip(variances, counts)] - merged_stats[feature][stat_type] = np.sqrt(np.sum(weighted_variances, axis=0)).tolist() + variances = [np.array(std) ** 2 for std in values] + weighted_variances = [ + var * count / total_count + for var, count in zip(variances, counts, strict=False) + ] + merged_stats[feature][stat_type] = np.sqrt( + np.sum(weighted_variances, axis=0) + ).tolist() else: merged_stats[feature][stat_type] = np.mean(np.array(values), axis=0).tolist() - - elif stat_type == 'max': + + elif stat_type == "max": merged_stats[feature][stat_type] = np.maximum.reduce(np.array(values)).tolist() - - elif stat_type == 'min': + + elif stat_type == "min": merged_stats[feature][stat_type] = np.minimum.reduce(np.array(values)).tolist() except Exception as e: print(f"Warning: Error processing {feature}.{stat_type}: {e}") continue else: # Shapes are different, need special handling for state vectors - if feature in ['observation.state', 'action']: + if feature in ["observation.state", "action"]: # For state vectors, we need to handle different dimensions - max_dim = max(len(np.array(stats[feature]['mean']).flatten()) for stats in stats_list) - + max_dim = max(len(np.array(stats[feature]["mean"]).flatten()) for stats in stats_list) + for stat_type in common_stat_types: try: # Get values and their original dimensions @@ -214,27 +233,29 @@ def merge_stats(stats_list): val = np.array(stats[feature][stat_type]).flatten() dim = len(val) values_with_dims.append((val, dim)) - + # Initialize result array with zeros result = np.zeros(max_dim) - + # Calculate statistics for each dimension separately - if stat_type == 'mean': - if all('count' in stats[feature] for stats in stats_list): - counts = [stats[feature]['count'][0] for stats in stats_list] + if stat_type == "mean": + if all("count" in stats[feature] for stats in stats_list): + counts = [stats[feature]["count"][0] for stats in stats_list] total_count = sum(counts) - + # For each dimension, calculate weighted mean of available values for d in range(max_dim): dim_values = [] dim_weights = [] - for (val, dim), count in zip(values_with_dims, counts): + for (val, dim), count in zip(values_with_dims, counts, strict=False): if d < dim: # Only use values that have this dimension dim_values.append(val[d]) dim_weights.append(count) - + if dim_values: # If we have values for this dimension - weighted_sum = sum(v * w for v, w in zip(dim_values, dim_weights)) + weighted_sum = sum( + v * w for v, w in zip(dim_values, dim_weights, strict=False) + ) result[d] = weighted_sum / sum(dim_weights) else: # Simple average for each dimension @@ -242,23 +263,25 @@ def merge_stats(stats_list): dim_values = [val[d] for val, dim in values_with_dims if d < dim] if dim_values: result[d] = sum(dim_values) / len(dim_values) - - elif stat_type == 'std': - if all('count' in stats[feature] for stats in stats_list): - counts = [stats[feature]['count'][0] for stats in stats_list] + + elif stat_type == "std": + if all("count" in stats[feature] for stats in stats_list): + counts = [stats[feature]["count"][0] for stats in stats_list] total_count = sum(counts) - + # For each dimension, calculate weighted variance for d in range(max_dim): dim_variances = [] dim_weights = [] - for (val, dim), count in zip(values_with_dims, counts): + for (val, dim), count in zip(values_with_dims, counts, strict=False): if d < dim: # Only use values that have this dimension - dim_variances.append(val[d]**2) # Square for variance + dim_variances.append(val[d] ** 2) # Square for variance dim_weights.append(count) - + if dim_variances: # If we have values for this dimension - weighted_var = sum(v * w for v, w in zip(dim_variances, dim_weights)) / sum(dim_weights) + weighted_var = sum( + v * w for v, w in zip(dim_variances, dim_weights, strict=False) + ) / sum(dim_weights) result[d] = np.sqrt(weighted_var) # Take sqrt for std else: # Simple average of std for each dimension @@ -266,54 +289,59 @@ def merge_stats(stats_list): dim_values = [val[d] for val, dim in values_with_dims if d < dim] if dim_values: result[d] = sum(dim_values) / len(dim_values) - - elif stat_type == 'max': + + elif stat_type == "max": # For each dimension, take the maximum of available values for d in range(max_dim): dim_values = [val[d] for val, dim in values_with_dims if d < dim] if dim_values: result[d] = max(dim_values) - - elif stat_type == 'min': + + elif stat_type == "min": # For each dimension, take the minimum of available values for d in range(max_dim): dim_values = [val[d] for val, dim in values_with_dims if d < dim] if dim_values: result[d] = min(dim_values) - + # Convert result to list and store merged_stats[feature][stat_type] = result.tolist() - + except Exception as e: - print(f"Warning: Error processing {feature}.{stat_type} with different dimensions: {e}") + print( + f"Warning: Error processing {feature}.{stat_type} with different dimensions: {e}" + ) continue else: # For other features with different shapes, use the first shape as template template_shape = original_shapes[0] print(f"Using shape {template_shape} as template for {feature}") - + for stat_type in common_stat_types: try: # Use the first stats as template merged_stats[feature][stat_type] = stats_list[0][feature][stat_type] except Exception as e: - print(f"Warning: Error processing {feature}.{stat_type} with shape {template_shape}: {e}") + print( + f"Warning: Error processing {feature}.{stat_type} with shape {template_shape}: {e}" + ) continue - + # Add count if available in all stats - if all('count' in stats[feature] for stats in stats_list): + if all("count" in stats[feature] for stats in stats_list): try: - merged_stats[feature]['count'] = [sum(stats[feature]['count'][0] for stats in stats_list)] + merged_stats[feature]["count"] = [sum(stats[feature]["count"][0] for stats in stats_list)] except Exception as e: print(f"Warning: Error processing {feature}.count: {e}") - + return merged_stats + def copy_videos(source_folders, output_folder, episode_mapping): """ 从源文件夹复制视频文件到输出文件夹,保持正确的索引和结构 (Copy video files from source folders to output folder, maintaining correct indices and structure) - + Args: source_folders (list): 源数据集文件夹路径列表 (List of source dataset folder paths) output_folder (str): 输出文件夹路径 (Output folder path) @@ -321,81 +349,69 @@ def copy_videos(source_folders, output_folder, episode_mapping): (List of tuples containing (old_folder, old_index, new_index)) """ # Get info.json to determine video structure - info_path = os.path.join(source_folders[0], 'meta', 'info.json') - with open(info_path, 'r') as f: + info_path = os.path.join(source_folders[0], "meta", "info.json") + with open(info_path) as f: info = json.load(f) - - video_path_template = info['video_path'] - + + video_path_template = info["video_path"] + # Identify video keys from the template # Example: "videos/chunk-{episode_chunk:03d}/{video_key}/episode_{episode_index:06d}.mp4" video_keys = [] - for feature_name, feature_info in info['features'].items(): - if feature_info.get('dtype') == 'video': + for feature_name, feature_info in info["features"].items(): + if feature_info.get("dtype") == "video": # Use the full feature name as the video key video_keys.append(feature_name) - + print(f"Found video keys: {video_keys}") - + # Copy videos for each episode for old_folder, old_index, new_index in episode_mapping: # Determine episode chunk (usually 0 for small datasets) - episode_chunk = old_index // info['chunks_size'] - new_episode_chunk = new_index // info['chunks_size'] - + episode_chunk = old_index // info["chunks_size"] + new_episode_chunk = new_index // info["chunks_size"] + for video_key in video_keys: # Try different possible source paths source_patterns = [ # Standard path with the episode index from metadata os.path.join( - old_folder, + old_folder, video_path_template.format( - episode_chunk=episode_chunk, - video_key=video_key, - episode_index=old_index - ) + episode_chunk=episode_chunk, video_key=video_key, episode_index=old_index + ), ), # Try with 0-based indexing os.path.join( - old_folder, - video_path_template.format( - episode_chunk=0, - video_key=video_key, - episode_index=0 - ) + old_folder, + video_path_template.format(episode_chunk=0, video_key=video_key, episode_index=0), ), # Try with different formatting os.path.join( - old_folder, - f"videos/chunk-{episode_chunk:03d}/{video_key}/episode_{old_index}.mp4" - ), - os.path.join( - old_folder, - f"videos/chunk-000/{video_key}/episode_000000.mp4" + old_folder, f"videos/chunk-{episode_chunk:03d}/{video_key}/episode_{old_index}.mp4" ), + os.path.join(old_folder, f"videos/chunk-000/{video_key}/episode_000000.mp4"), ] - + # Find the first existing source path source_video_path = None for pattern in source_patterns: if os.path.exists(pattern): source_video_path = pattern break - + if source_video_path: # Construct destination path dest_video_path = os.path.join( - output_folder, + output_folder, video_path_template.format( - episode_chunk=new_episode_chunk, - video_key=video_key, - episode_index=new_index - ) + episode_chunk=new_episode_chunk, video_key=video_key, episode_index=new_index + ), ) - + # Create destination directory if it doesn't exist os.makedirs(os.path.dirname(dest_video_path), exist_ok=True) - + print(f"Copying video: {source_video_path} -> {dest_video_path}") shutil.copy2(source_video_path, dest_video_path) else: @@ -403,106 +419,131 @@ def copy_videos(source_folders, output_folder, episode_mapping): found = False for root, _, files in os.walk(os.path.join(old_folder, "videos")): for file in files: - if file.endswith('.mp4') and video_key in root: + if file.endswith(".mp4") and video_key in root: source_video_path = os.path.join(root, file) - + # Construct destination path dest_video_path = os.path.join( - output_folder, + output_folder, video_path_template.format( episode_chunk=new_episode_chunk, video_key=video_key, - episode_index=new_index - ) + episode_index=new_index, + ), ) - + # Create destination directory if it doesn't exist os.makedirs(os.path.dirname(dest_video_path), exist_ok=True) - - print(f"Copying video (found by search): {source_video_path} -> {dest_video_path}") + + print( + f"Copying video (found by search): {source_video_path} -> {dest_video_path}" + ) shutil.copy2(source_video_path, dest_video_path) found = True break if found: break - + if not found: - print(f"Warning: Video file not found for {video_key}, episode {old_index} in {old_folder}") + print( + f"Warning: Video file not found for {video_key}, episode {old_index} in {old_folder}" + ) + def validate_timestamps(source_folders, tolerance_s=1e-4): """ 验证源数据集的时间戳结构,识别潜在问题 (Validate timestamp structure of source datasets, identify potential issues) - + Args: source_folders (list): 源数据集文件夹路径列表 (List of source dataset folder paths) tolerance_s (float): 时间戳不连续性的容差值,以秒为单位 (Tolerance for timestamp discontinuities in seconds) - + Returns: tuple: (issues, fps_values) - 问题列表和检测到的FPS值列表 (List of issues and list of detected FPS values) """ issues = [] fps_values = [] - + for folder in source_folders: try: # 尝试从 info.json 获取 FPS (Try to get FPS from info.json) - info_path = os.path.join(folder, 'meta', 'info.json') + info_path = os.path.join(folder, "meta", "info.json") if os.path.exists(info_path): - with open(info_path, 'r') as f: + with open(info_path) as f: info = json.load(f) - if 'fps' in info: - fps = info['fps'] + if "fps" in info: + fps = info["fps"] fps_values.append(fps) print(f"数据集 {folder} FPS={fps} (Dataset {folder} FPS={fps})") - + # 检查是否有parquet文件包含时间戳 (Check if any parquet files contain timestamps) parquet_path = None for root, _, files in os.walk(os.path.join(folder, "parquet")): for file in files: - if file.endswith('.parquet'): + if file.endswith(".parquet"): parquet_path = os.path.join(root, file) break if parquet_path: break - + if not parquet_path: for root, _, files in os.walk(os.path.join(folder, "data")): for file in files: - if file.endswith('.parquet'): + if file.endswith(".parquet"): parquet_path = os.path.join(root, file) break if parquet_path: break - + if parquet_path: df = pd.read_parquet(parquet_path) - timestamp_cols = [col for col in df.columns if 'timestamp' in col or 'time' in col] + timestamp_cols = [col for col in df.columns if "timestamp" in col or "time" in col] if timestamp_cols: - print(f"数据集 {folder} 包含时间戳列: {timestamp_cols} (Dataset {folder} contains timestamp columns: {timestamp_cols})") + print( + f"数据集 {folder} 包含时间戳列: {timestamp_cols} (Dataset {folder} contains timestamp columns: {timestamp_cols})" + ) else: - issues.append(f"警告: 数据集 {folder} 没有时间戳列 (Warning: Dataset {folder} has no timestamp columns)") + issues.append( + f"警告: 数据集 {folder} 没有时间戳列 (Warning: Dataset {folder} has no timestamp columns)" + ) else: - issues.append(f"警告: 数据集 {folder} 未找到parquet文件 (Warning: No parquet files found in dataset {folder})") - + issues.append( + f"警告: 数据集 {folder} 未找到parquet文件 (Warning: No parquet files found in dataset {folder})" + ) + except Exception as e: - issues.append(f"错误: 验证数据集 {folder} 失败: {e} (Error: Failed to validate dataset {folder}: {e})") + issues.append( + f"错误: 验证数据集 {folder} 失败: {e} (Error: Failed to validate dataset {folder}: {e})" + ) print(f"验证错误: {e} (Validation error: {e})") traceback.print_exc() - + # 检查FPS是否一致 (Check if FPS values are consistent) if len(set(fps_values)) > 1: - issues.append(f"警告: 数据集FPS不一致: {fps_values} (Warning: Inconsistent FPS across datasets: {fps_values})") - + issues.append( + f"警告: 数据集FPS不一致: {fps_values} (Warning: Inconsistent FPS across datasets: {fps_values})" + ) + return issues, fps_values -def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18, fps=None, - episode_to_frame_index=None, folder_task_mapping=None, chunks_size=1000, default_fps=20): + +def copy_data_files( + source_folders, + output_folder, + episode_mapping, + max_dim=18, + fps=None, + episode_to_frame_index=None, + folder_task_mapping=None, + chunks_size=1000, + default_fps=20, +): """ 复制并处理parquet数据文件,包括维度填充和索引更新 (Copy and process parquet data files, including dimension padding and index updates) - + Args: source_folders (list): 源数据集文件夹路径列表 (List of source dataset folder paths) output_folder (str): 输出文件夹路径 (Output folder path) @@ -516,115 +557,131 @@ def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18, (Mapping of task_index for each folder) chunks_size (int): 每个chunk包含的episode数量 (Number of episodes per chunk) default_fps (float): 默认帧率,当无法从数据集获取时使用 (Default frame rate when unable to obtain from dataset) - + Returns: bool: 是否成功复制了至少一个文件 (Whether at least one file was successfully copied) """ # 获取第一个数据集的FPS(如果未提供)(Get FPS from first dataset if not provided) if fps is None: - info_path = os.path.join(source_folders[0], 'meta', 'info.json') + info_path = os.path.join(source_folders[0], "meta", "info.json") if os.path.exists(info_path): - with open(info_path, 'r') as f: + with open(info_path) as f: info = json.load(f) - fps = info.get('fps', default_fps) # 使用变量替代硬编码的20 (Use variable instead of hardcoded 20) + fps = info.get( + "fps", default_fps + ) # 使用变量替代硬编码的20 (Use variable instead of hardcoded 20) else: fps = default_fps # 使用变量替代硬编码的20 (Use variable instead of hardcoded 20) print(f"使用FPS={fps} (Using FPS={fps})") - + # 为每个episode复制和处理数据文件 (Copy and process data files for each episode) total_copied = 0 total_failed = 0 - - # 添加一个列表来记录失败的文件及原因 + + # 添加一个列表来记录失败的文件及原因 # (Add a list to record failed files and reasons) failed_files = [] - + for i, (old_folder, old_index, new_index) in enumerate(episode_mapping): # 尝试找到源parquet文件 (Try to find source parquet file) episode_str = f"episode_{old_index:06d}.parquet" source_paths = [ os.path.join(old_folder, "parquet", episode_str), - os.path.join(old_folder, "data", episode_str) + os.path.join(old_folder, "data", episode_str), ] - + source_path = None for path in source_paths: if os.path.exists(path): source_path = path break - + if source_path: try: # 读取parquet文件 (Read parquet file) df = pd.read_parquet(source_path) - + # 检查是否需要填充维度 (Check if dimensions need padding) - for feature in ['observation.state', 'action']: + for feature in ["observation.state", "action"]: if feature in df.columns: # 检查第一个非空值 (Check first non-null value) for idx, value in enumerate(df[feature]): if value is not None and isinstance(value, (list, np.ndarray)): current_dim = len(value) if current_dim < max_dim: - print(f"填充 {feature} 从 {current_dim} 维到 {max_dim} 维 (Padding {feature} from {current_dim} to {max_dim} dimensions)") + print( + f"填充 {feature} 从 {current_dim} 维到 {max_dim} 维 (Padding {feature} from {current_dim} to {max_dim} dimensions)" + ) # 使用零填充到目标维度 (Pad with zeros to target dimension) df[feature] = df[feature].apply( - lambda x: np.pad(x, (0, max_dim - len(x)), 'constant').tolist() - if x is not None and isinstance(x, (list, np.ndarray)) and len(x) < max_dim + lambda x: np.pad(x, (0, max_dim - len(x)), "constant").tolist() + if x is not None + and isinstance(x, (list, np.ndarray)) + and len(x) < max_dim else x ) break - + # 更新episode_index列 (Update episode_index column) - if 'episode_index' in df.columns: - print(f"更新episode_index从 {df['episode_index'].iloc[0]} 到 {new_index} (Update episode_index from {df['episode_index'].iloc[0]} to {new_index})") - df['episode_index'] = new_index - + if "episode_index" in df.columns: + print( + f"更新episode_index从 {df['episode_index'].iloc[0]} 到 {new_index} (Update episode_index from {df['episode_index'].iloc[0]} to {new_index})" + ) + df["episode_index"] = new_index + # 更新index列 (Update index column) - if 'index' in df.columns: + if "index" in df.columns: if episode_to_frame_index and new_index in episode_to_frame_index: # 使用预先计算的帧索引起始值 (Use pre-calculated frame index start value) first_index = episode_to_frame_index[new_index] - print(f"更新index列,起始值: {first_index}(使用全局累积帧计数)(Update index column, start value: {first_index} (using global cumulative frame count))") + print( + f"更新index列,起始值: {first_index}(使用全局累积帧计数)(Update index column, start value: {first_index} (using global cumulative frame count))" + ) else: # 如果没有提供映射,使用当前的计算方式作为回退 # (If no mapping provided, use current calculation as fallback) first_index = new_index * len(df) - print(f"更新index列,起始值: {first_index}(使用episode索引乘以长度)(Update index column, start value: {first_index} (using episode index multiplied by length))") - + print( + f"更新index列,起始值: {first_index}(使用episode索引乘以长度)(Update index column, start value: {first_index} (using episode index multiplied by length))" + ) + # 更新所有帧的索引 (Update indices for all frames) - df['index'] = [first_index + i for i in range(len(df))] - + df["index"] = [first_index + i for i in range(len(df))] + # 更新task_index列 (Update task_index column) - if 'task_index' in df.columns and folder_task_mapping and old_folder in folder_task_mapping: + if "task_index" in df.columns and folder_task_mapping and old_folder in folder_task_mapping: # 获取当前task_index (Get current task_index) - current_task_index = df['task_index'].iloc[0] - + current_task_index = df["task_index"].iloc[0] + # 检查是否有对应的新索引 (Check if there's a corresponding new index) if current_task_index in folder_task_mapping[old_folder]: new_task_index = folder_task_mapping[old_folder][current_task_index] - print(f"更新task_index从 {current_task_index} 到 {new_task_index} (Update task_index from {current_task_index} to {new_task_index})") - df['task_index'] = new_task_index + print( + f"更新task_index从 {current_task_index} 到 {new_task_index} (Update task_index from {current_task_index} to {new_task_index})" + ) + df["task_index"] = new_task_index else: - print(f"警告: 找不到task_index {current_task_index}的映射关系 (Warning: No mapping found for task_index {current_task_index})") - + print( + f"警告: 找不到task_index {current_task_index}的映射关系 (Warning: No mapping found for task_index {current_task_index})" + ) + # 计算chunk编号 (Calculate chunk number) chunk_index = new_index // chunks_size - + # 创建正确的目标目录 (Create correct target directory) chunk_dir = os.path.join(output_folder, "data", f"chunk-{chunk_index:03d}") os.makedirs(chunk_dir, exist_ok=True) - + # 构建正确的目标路径 (Build correct target path) dest_path = os.path.join(chunk_dir, f"episode_{new_index:06d}.parquet") - + # 保存到正确位置 (Save to correct location) df.to_parquet(dest_path, index=False) - + total_copied += 1 print(f"已处理并保存: {dest_path} (Processed and saved: {dest_path})") - + except Exception as e: error_msg = f"处理 {source_path} 失败: {e} (Processing {source_path} failed: {e})" print(error_msg) @@ -636,76 +693,96 @@ def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18, found = False for root, _, files in os.walk(old_folder): for file in files: - if file.endswith('.parquet') and f"episode_{old_index:06d}" in file: + if file.endswith(".parquet") and f"episode_{old_index:06d}" in file: try: source_path = os.path.join(root, file) - + # 读取parquet文件 (Read parquet file) df = pd.read_parquet(source_path) - + # 检查是否需要填充维度 (Check if dimensions need padding) - for feature in ['observation.state', 'action']: + for feature in ["observation.state", "action"]: if feature in df.columns: # 检查第一个非空值 (Check first non-null value) for idx, value in enumerate(df[feature]): if value is not None and isinstance(value, (list, np.ndarray)): current_dim = len(value) if current_dim < max_dim: - print(f"填充 {feature} 从 {current_dim} 维到 {max_dim} 维 (Padding {feature} from {current_dim} to {max_dim} dimensions)") + print( + f"填充 {feature} 从 {current_dim} 维到 {max_dim} 维 (Padding {feature} from {current_dim} to {max_dim} dimensions)" + ) # 使用零填充到目标维度 (Pad with zeros to target dimension) df[feature] = df[feature].apply( - lambda x: np.pad(x, (0, max_dim - len(x)), 'constant').tolist() - if x is not None and isinstance(x, (list, np.ndarray)) and len(x) < max_dim + lambda x: np.pad( + x, (0, max_dim - len(x)), "constant" + ).tolist() + if x is not None + and isinstance(x, (list, np.ndarray)) + and len(x) < max_dim else x ) break - + # 更新episode_index列 (Update episode_index column) - if 'episode_index' in df.columns: - print(f"更新episode_index从 {df['episode_index'].iloc[0]} 到 {new_index} (Update episode_index from {df['episode_index'].iloc[0]} to {new_index})") - df['episode_index'] = new_index - + if "episode_index" in df.columns: + print( + f"更新episode_index从 {df['episode_index'].iloc[0]} 到 {new_index} (Update episode_index from {df['episode_index'].iloc[0]} to {new_index})" + ) + df["episode_index"] = new_index + # 更新index列 (Update index column) - if 'index' in df.columns: + if "index" in df.columns: if episode_to_frame_index and new_index in episode_to_frame_index: # 使用预先计算的帧索引起始值 (Use pre-calculated frame index start value) first_index = episode_to_frame_index[new_index] - print(f"更新index列,起始值: {first_index}(使用全局累积帧计数)(Update index column, start value: {first_index} (using global cumulative frame count))") + print( + f"更新index列,起始值: {first_index}(使用全局累积帧计数)(Update index column, start value: {first_index} (using global cumulative frame count))" + ) else: # 如果没有提供映射,使用当前的计算方式作为回退 # (If no mapping provided, use current calculation as fallback) first_index = new_index * len(df) - print(f"更新index列,起始值: {first_index}(使用episode索引乘以长度)(Update index column, start value: {first_index} (using episode index multiplied by length))") - + print( + f"更新index列,起始值: {first_index}(使用episode索引乘以长度)(Update index column, start value: {first_index} (using episode index multiplied by length))" + ) + # 更新所有帧的索引 (Update indices for all frames) - df['index'] = [first_index + i for i in range(len(df))] - + df["index"] = [first_index + i for i in range(len(df))] + # 更新task_index列 (Update task_index column) - if 'task_index' in df.columns and folder_task_mapping and old_folder in folder_task_mapping: + if ( + "task_index" in df.columns + and folder_task_mapping + and old_folder in folder_task_mapping + ): # 获取当前task_index (Get current task_index) - current_task_index = df['task_index'].iloc[0] - + current_task_index = df["task_index"].iloc[0] + # 检查是否有对应的新索引 (Check if there's a corresponding new index) if current_task_index in folder_task_mapping[old_folder]: new_task_index = folder_task_mapping[old_folder][current_task_index] - print(f"更新task_index从 {current_task_index} 到 {new_task_index} (Update task_index from {current_task_index} to {new_task_index})") - df['task_index'] = new_task_index + print( + f"更新task_index从 {current_task_index} 到 {new_task_index} (Update task_index from {current_task_index} to {new_task_index})" + ) + df["task_index"] = new_task_index else: - print(f"警告: 找不到task_index {current_task_index}的映射关系 (Warning: No mapping found for task_index {current_task_index})") - + print( + f"警告: 找不到task_index {current_task_index}的映射关系 (Warning: No mapping found for task_index {current_task_index})" + ) + # 计算chunk编号 (Calculate chunk number) chunk_index = new_index // chunks_size - + # 创建正确的目标目录 (Create correct target directory) chunk_dir = os.path.join(output_folder, "data", f"chunk-{chunk_index:03d}") os.makedirs(chunk_dir, exist_ok=True) - + # 构建正确的目标路径 (Build correct target path) dest_path = os.path.join(chunk_dir, f"episode_{new_index:06d}.parquet") - + # 保存到正确位置 (Save to correct location) df.to_parquet(dest_path, index=False) - + total_copied += 1 found = True print(f"已处理并保存: {dest_path} (Processed and saved: {dest_path})") @@ -718,34 +795,37 @@ def copy_data_files(source_folders, output_folder, episode_mapping, max_dim=18, total_failed += 1 if found: break - + if not found: error_msg = f"找不到episode {old_index}的parquet文件,源文件夹: {old_folder}" print(error_msg) - failed_files.append({"file": f"episode_{old_index:06d}.parquet", "reason": "文件未找到", "folder": old_folder}) + failed_files.append( + {"file": f"episode_{old_index:06d}.parquet", "reason": "文件未找到", "folder": old_folder} + ) total_failed += 1 - + print(f"共复制 {total_copied} 个数据文件,{total_failed} 个失败") - + # 打印所有失败的文件详情 (Print details of all failed files) if failed_files: print("\n失败的文件详情 (Details of failed files):") for i, failed in enumerate(failed_files): - print(f"{i+1}. 文件 (File): {failed['file']}") - if 'folder' in failed: + print(f"{i + 1}. 文件 (File): {failed['file']}") + if "folder" in failed: print(f" 文件夹 (Folder): {failed['folder']}") - if 'episode' in failed: + if "episode" in failed: print(f" Episode索引 (Episode index): {failed['episode']}") print(f" 原因 (Reason): {failed['reason']}") print("---") - + return total_copied > 0 + def pad_parquet_data(source_path, target_path, original_dim=14, target_dim=18): """ 通过零填充将parquet数据从原始维度扩展到目标维度 (Extend parquet data from original dimension to target dimension by zero-padding) - + Args: source_path (str): 源parquet文件路径 (Source parquet file path) target_path (str): 目标parquet文件路径 (Target parquet file path) @@ -754,66 +834,69 @@ def pad_parquet_data(source_path, target_path, original_dim=14, target_dim=18): """ # 读取parquet文件 df = pd.read_parquet(source_path) - + # 打印列名以便调试 print(f"Columns in {source_path}: {df.columns.tolist()}") - + # 创建新的DataFrame来存储填充后的数据 new_df = df.copy() - + # 检查observation.state和action列是否存在 - if 'observation.state' in df.columns: + if "observation.state" in df.columns: # 检查第一行数据,确认是否为向量 - first_state = df['observation.state'].iloc[0] + first_state = df["observation.state"].iloc[0] print(f"First observation.state type: {type(first_state)}, value: {first_state}") - + # 如果是向量(列表或numpy数组) if isinstance(first_state, (list, np.ndarray)): # 检查维度 state_dim = len(first_state) print(f"observation.state dimension: {state_dim}") - + if state_dim < target_dim: # 填充向量 print(f"Padding observation.state from {state_dim} to {target_dim} dimensions") - new_df['observation.state'] = df['observation.state'].apply( - lambda x: np.pad(x, (0, target_dim - len(x)), 'constant').tolist() + new_df["observation.state"] = df["observation.state"].apply( + lambda x: np.pad(x, (0, target_dim - len(x)), "constant").tolist() ) - + # 同样处理action列 - if 'action' in df.columns: + if "action" in df.columns: # 检查第一行数据 - first_action = df['action'].iloc[0] + first_action = df["action"].iloc[0] print(f"First action type: {type(first_action)}, value: {first_action}") - + # 如果是向量 if isinstance(first_action, (list, np.ndarray)): # 检查维度 action_dim = len(first_action) print(f"action dimension: {action_dim}") - + if action_dim < target_dim: # 填充向量 print(f"Padding action from {action_dim} to {target_dim} dimensions") - new_df['action'] = df['action'].apply( - lambda x: np.pad(x, (0, target_dim - len(x)), 'constant').tolist() + new_df["action"] = df["action"].apply( + lambda x: np.pad(x, (0, target_dim - len(x)), "constant").tolist() ) - + # 确保目标目录存在 os.makedirs(os.path.dirname(target_path), exist_ok=True) - + # 保存到新的parquet文件 new_df.to_parquet(target_path, index=False) - + print(f"已将{source_path}处理并保存到{target_path}") - + return new_df -def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s=1e-4, max_dim=18, default_fps=20): + +def merge_datasets( + source_folders, output_folder, validate_ts=False, tolerance_s=1e-4, max_dim=18, default_fps=20 +): """ 将多个数据集文件夹合并为一个,处理索引、维度和元数据 (Merge multiple dataset folders into one, handling indices, dimensions, and metadata) - + Args: source_folders (list): 源数据集文件夹路径列表 (List of source dataset folder paths) output_folder (str): 输出文件夹路径 (Output folder path) @@ -821,7 +904,7 @@ def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s tolerance_s (float): 时间戳不连续性的容差值,以秒为单位 (Tolerance for timestamp discontinuities in seconds) max_dim (int): 向量的最大维度 (Maximum dimension for vectors) default_fps (float): 默认帧率 (Default frame rate) - + 这个函数执行以下操作: (This function performs the following operations:) 1. 合并所有的episodes、tasks和stats (Merges all episodes, tasks and stats) @@ -832,8 +915,8 @@ def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s """ # Create output folder if it doesn't exist os.makedirs(output_folder, exist_ok=True) - os.makedirs(os.path.join(output_folder, 'meta'), exist_ok=True) - + os.makedirs(os.path.join(output_folder, "meta"), exist_ok=True) + # 注释掉时间戳验证 # if validate_ts: # issues, fps_values = validate_timestamps(source_folders, tolerance_s) @@ -841,7 +924,7 @@ def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s # print("时间戳验证发现以下问题:") # for issue in issues: # print(f" - {issue}") - # + # # # 获取共同的FPS值 # if fps_values: # fps = max(set(fps_values), key=fps_values.count) @@ -852,73 +935,75 @@ def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s # else: fps = default_fps print(f"使用默认FPS值: {fps}") - + # Load episodes from all source folders all_episodes = [] all_episodes_stats = [] all_tasks = [] - + total_frames = 0 total_episodes = 0 - + # Keep track of episode mapping (old_folder, old_index, new_index) episode_mapping = [] - + # Collect all stats for proper merging all_stats_data = [] - + # Track dimensions for each folder folder_dimensions = {} - + # 添加一个变量来跟踪累积的帧数 cumulative_frame_count = 0 - + # 创建一个映射,用于存储每个新的episode索引对应的起始帧索引 episode_to_frame_index = {} - + # 创建一个映射,用于跟踪旧的任务描述到新任务索引的映射 task_desc_to_new_index = {} # 创建一个映射,用于存储每个源文件夹和旧任务索引到新任务索引的映射 folder_task_mapping = {} - + # 首先收集所有不同的任务描述 all_unique_tasks = [] - + # 从info.json获取chunks_size - info_path = os.path.join(source_folders[0], 'meta', 'info.json') + info_path = os.path.join(source_folders[0], "meta", "info.json") chunks_size = 1000 # 默认值 if os.path.exists(info_path): - with open(info_path, 'r') as f: + with open(info_path) as f: info = json.load(f) - chunks_size = info.get('chunks_size', 1000) - + chunks_size = info.get("chunks_size", 1000) + # 使用更简单的方法计算视频总数 (Use simpler method to calculate total videos) total_videos = 0 - + for folder in source_folders: try: # 从每个数据集的info.json直接获取total_videos # (Get total_videos directly from each dataset's info.json) - folder_info_path = os.path.join(folder, 'meta', 'info.json') + folder_info_path = os.path.join(folder, "meta", "info.json") if os.path.exists(folder_info_path): - with open(folder_info_path, 'r') as f: + with open(folder_info_path) as f: folder_info = json.load(f) - if 'total_videos' in folder_info: - folder_videos = folder_info['total_videos'] + if "total_videos" in folder_info: + folder_videos = folder_info["total_videos"] total_videos += folder_videos - print(f"从{folder}的info.json中读取到视频数量: {folder_videos} (Read video count from {folder}'s info.json: {folder_videos})") - + print( + f"从{folder}的info.json中读取到视频数量: {folder_videos} (Read video count from {folder}'s info.json: {folder_videos})" + ) + # Check dimensions of state vectors in this folder folder_dim = max_dim # 使用变量替代硬编码的18 - + # Try to find a parquet file to determine dimensions for root, dirs, files in os.walk(folder): for file in files: - if file.endswith('.parquet'): + if file.endswith(".parquet"): try: df = pd.read_parquet(os.path.join(root, file)) - if 'observation.state' in df.columns: - first_state = df['observation.state'].iloc[0] + if "observation.state" in df.columns: + first_state = df["observation.state"].iloc[0] if isinstance(first_state, (list, np.ndarray)): folder_dim = len(first_state) print(f"Detected {folder_dim} dimensions in {folder}") @@ -928,127 +1013,131 @@ def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s break if folder_dim != max_dim: # 使用变量替代硬编码的18 break - + folder_dimensions[folder] = folder_dim - + # Load episodes - episodes_path = os.path.join(folder, 'meta', 'episodes.jsonl') + episodes_path = os.path.join(folder, "meta", "episodes.jsonl") if not os.path.exists(episodes_path): print(f"Warning: Episodes file not found in {folder}, skipping") continue - + episodes = load_jsonl(episodes_path) - + # Load episode stats - episodes_stats_path = os.path.join(folder, 'meta', 'episodes_stats.jsonl') + episodes_stats_path = os.path.join(folder, "meta", "episodes_stats.jsonl") episodes_stats = [] if os.path.exists(episodes_stats_path): episodes_stats = load_jsonl(episodes_stats_path) - + # Create a mapping of episode_index to stats stats_map = {} for stat in episodes_stats: - if 'episode_index' in stat: - stats_map[stat['episode_index']] = stat - + if "episode_index" in stat: + stats_map[stat["episode_index"]] = stat + # Load tasks - tasks_path = os.path.join(folder, 'meta', 'tasks.jsonl') + tasks_path = os.path.join(folder, "meta", "tasks.jsonl") folder_tasks = [] if os.path.exists(tasks_path): folder_tasks = load_jsonl(tasks_path) - + # 创建此文件夹的任务映射 folder_task_mapping[folder] = {} - + # 处理每个任务 for task in folder_tasks: - task_desc = task['task'] - old_index = task['task_index'] - + task_desc = task["task"] + old_index = task["task_index"] + # 检查任务描述是否已存在 if task_desc not in task_desc_to_new_index: # 添加新任务描述,分配新索引 new_index = len(all_unique_tasks) task_desc_to_new_index[task_desc] = new_index all_unique_tasks.append({"task_index": new_index, "task": task_desc}) - + # 保存此文件夹中旧索引到新索引的映射 folder_task_mapping[folder][old_index] = task_desc_to_new_index[task_desc] - + # Process all episodes from this folder for episode in episodes: - old_index = episode['episode_index'] + old_index = episode["episode_index"] new_index = total_episodes - + # Update episode index - episode['episode_index'] = new_index + episode["episode_index"] = new_index all_episodes.append(episode) - + # Update stats if available if old_index in stats_map: stats = stats_map[old_index] - stats['episode_index'] = new_index - + stats["episode_index"] = new_index + # Pad stats data if needed - if 'stats' in stats and folder_dimensions[folder] < max_dim: # 使用变量替代硬编码的18 + if "stats" in stats and folder_dimensions[folder] < max_dim: # 使用变量替代硬编码的18 # Pad observation.state and action stats - for feature in ['observation.state', 'action']: - if feature in stats['stats']: - for stat_type in ['mean', 'std', 'max', 'min']: - if stat_type in stats['stats'][feature]: + for feature in ["observation.state", "action"]: + if feature in stats["stats"]: + for stat_type in ["mean", "std", "max", "min"]: + if stat_type in stats["stats"][feature]: # Get current values - values = stats['stats'][feature][stat_type] - + values = stats["stats"][feature][stat_type] + # Check if it's a list/array that needs padding - if isinstance(values, list) and len(values) < max_dim: # 使用变量替代硬编码的18 + if ( + isinstance(values, list) and len(values) < max_dim + ): # 使用变量替代硬编码的18 # Pad with zeros - padded = values + [0.0] * (max_dim - len(values)) # 使用变量替代硬编码的18 - stats['stats'][feature][stat_type] = padded - + padded = values + [0.0] * ( + max_dim - len(values) + ) # 使用变量替代硬编码的18 + stats["stats"][feature][stat_type] = padded + all_episodes_stats.append(stats) - + # Add to all_stats_data for proper merging - if 'stats' in stats: - all_stats_data.append(stats['stats']) - + if "stats" in stats: + all_stats_data.append(stats["stats"]) + # Add to mapping episode_mapping.append((folder, old_index, new_index)) - + # Update counters total_episodes += 1 - total_frames += episode['length'] - + total_frames += episode["length"] + # 处理每个episode时收集此信息 episode_to_frame_index[new_index] = cumulative_frame_count - cumulative_frame_count += episode['length'] - + cumulative_frame_count += episode["length"] + # 使用收集的唯一任务列表替换之前的任务处理逻辑 all_tasks = all_unique_tasks - + except Exception as e: print(f"Error processing folder {folder}: {e}") continue - + print(f"Processed {total_episodes} episodes from {len(source_folders)} folders") - + # Save combined episodes and stats - save_jsonl(all_episodes, os.path.join(output_folder, 'meta', 'episodes.jsonl')) - save_jsonl(all_episodes_stats, os.path.join(output_folder, 'meta', 'episodes_stats.jsonl')) - save_jsonl(all_tasks, os.path.join(output_folder, 'meta', 'tasks.jsonl')) - + save_jsonl(all_episodes, os.path.join(output_folder, "meta", "episodes.jsonl")) + save_jsonl(all_episodes_stats, os.path.join(output_folder, "meta", "episodes_stats.jsonl")) + save_jsonl(all_tasks, os.path.join(output_folder, "meta", "tasks.jsonl")) + # Merge and save stats stats_list = [] for folder in source_folders: - stats_path = os.path.join(folder, 'meta', 'stats.json') + stats_path = os.path.join(folder, "meta", "stats.json") if os.path.exists(stats_path): - with open(stats_path, 'r') as f: + with open(stats_path) as f: stats = json.load(f) stats_list.append(stats) - + if stats_list: # Merge global stats merged_stats = merge_stats(stats_list) - + # Update merged stats with episode-specific stats if available if all_stats_data: # For each feature in the stats @@ -1056,145 +1145,165 @@ def merge_datasets(source_folders, output_folder, validate_ts=False, tolerance_s if feature in all_stats_data[0]: # Recalculate statistics based on all episodes values = [stat[feature] for stat in all_stats_data if feature in stat] - + # Find the maximum dimension for this feature - max_dim = max(len(np.array(val.get('mean', [0])).flatten()) for val in values if 'mean' in val) - + max_dim = max( + len(np.array(val.get("mean", [0])).flatten()) for val in values if "mean" in val + ) + # Update count - if 'count' in merged_stats[feature]: - merged_stats[feature]['count'] = [sum(stat.get('count', [0])[0] for stat in values if 'count' in stat)] - + if "count" in merged_stats[feature]: + merged_stats[feature]["count"] = [ + sum(stat.get("count", [0])[0] for stat in values if "count" in stat) + ] + # Update min/max with padding - if 'min' in merged_stats[feature] and all('min' in stat for stat in values): + if "min" in merged_stats[feature] and all("min" in stat for stat in values): # Pad min values padded_mins = [] for val in values: - val_array = np.array(val['min']) + val_array = np.array(val["min"]) val_flat = val_array.flatten() if len(val_flat) < max_dim: padded = np.zeros(max_dim) - padded[:len(val_flat)] = val_flat + padded[: len(val_flat)] = val_flat padded_mins.append(padded) else: padded_mins.append(val_flat) - merged_stats[feature]['min'] = np.minimum.reduce(padded_mins).tolist() - - if 'max' in merged_stats[feature] and all('max' in stat for stat in values): + merged_stats[feature]["min"] = np.minimum.reduce(padded_mins).tolist() + + if "max" in merged_stats[feature] and all("max" in stat for stat in values): # Pad max values padded_maxs = [] for val in values: - val_array = np.array(val['max']) + val_array = np.array(val["max"]) val_flat = val_array.flatten() if len(val_flat) < max_dim: padded = np.zeros(max_dim) - padded[:len(val_flat)] = val_flat + padded[: len(val_flat)] = val_flat padded_maxs.append(padded) else: padded_maxs.append(val_flat) - merged_stats[feature]['max'] = np.maximum.reduce(padded_maxs).tolist() - + merged_stats[feature]["max"] = np.maximum.reduce(padded_maxs).tolist() + # Update mean and std (weighted by count if available) - if 'mean' in merged_stats[feature] and all('mean' in stat for stat in values): + if "mean" in merged_stats[feature] and all("mean" in stat for stat in values): # Pad mean values padded_means = [] for val in values: - val_array = np.array(val['mean']) + val_array = np.array(val["mean"]) val_flat = val_array.flatten() if len(val_flat) < max_dim: padded = np.zeros(max_dim) - padded[:len(val_flat)] = val_flat + padded[: len(val_flat)] = val_flat padded_means.append(padded) else: padded_means.append(val_flat) - - if all('count' in stat for stat in values): - counts = [stat['count'][0] for stat in values] + + if all("count" in stat for stat in values): + counts = [stat["count"][0] for stat in values] total_count = sum(counts) - weighted_means = [mean * count / total_count for mean, count in zip(padded_means, counts)] - merged_stats[feature]['mean'] = np.sum(weighted_means, axis=0).tolist() + weighted_means = [ + mean * count / total_count + for mean, count in zip(padded_means, counts, strict=False) + ] + merged_stats[feature]["mean"] = np.sum(weighted_means, axis=0).tolist() else: - merged_stats[feature]['mean'] = np.mean(padded_means, axis=0).tolist() - - if 'std' in merged_stats[feature] and all('std' in stat for stat in values): + merged_stats[feature]["mean"] = np.mean(padded_means, axis=0).tolist() + + if "std" in merged_stats[feature] and all("std" in stat for stat in values): # Pad std values padded_stds = [] for val in values: - val_array = np.array(val['std']) + val_array = np.array(val["std"]) val_flat = val_array.flatten() if len(val_flat) < max_dim: padded = np.zeros(max_dim) - padded[:len(val_flat)] = val_flat + padded[: len(val_flat)] = val_flat padded_stds.append(padded) else: padded_stds.append(val_flat) - - if all('count' in stat for stat in values): - counts = [stat['count'][0] for stat in values] + + if all("count" in stat for stat in values): + counts = [stat["count"][0] for stat in values] total_count = sum(counts) variances = [std**2 for std in padded_stds] - weighted_variances = [var * count / total_count for var, count in zip(variances, counts)] - merged_stats[feature]['std'] = np.sqrt(np.sum(weighted_variances, axis=0)).tolist() + weighted_variances = [ + var * count / total_count + for var, count in zip(variances, counts, strict=False) + ] + merged_stats[feature]["std"] = np.sqrt( + np.sum(weighted_variances, axis=0) + ).tolist() else: # Simple average of standard deviations - merged_stats[feature]['std'] = np.mean(padded_stds, axis=0).tolist() - - with open(os.path.join(output_folder, 'meta', 'stats.json'), 'w') as f: + merged_stats[feature]["std"] = np.mean(padded_stds, axis=0).tolist() + + with open(os.path.join(output_folder, "meta", "stats.json"), "w") as f: json.dump(merged_stats, f, indent=4) - + # Update and save info.json - info_path = os.path.join(source_folders[0], 'meta', 'info.json') - with open(info_path, 'r') as f: + info_path = os.path.join(source_folders[0], "meta", "info.json") + with open(info_path) as f: info = json.load(f) - + # Update info with correct counts - info['total_episodes'] = total_episodes - info['total_frames'] = total_frames - info['total_tasks'] = len(all_tasks) - info['total_chunks'] = (total_episodes + info['chunks_size'] - 1) // info['chunks_size'] # Ceiling division - + info["total_episodes"] = total_episodes + info["total_frames"] = total_frames + info["total_tasks"] = len(all_tasks) + info["total_chunks"] = (total_episodes + info["chunks_size"] - 1) // info[ + "chunks_size" + ] # Ceiling division + # Update splits - info['splits'] = {'train': f'0:{total_episodes}'} - + info["splits"] = {"train": f"0:{total_episodes}"} + # Update feature dimensions to the maximum dimension - if 'features' in info: + if "features" in info: # Find the maximum dimension across all folders actual_max_dim = max_dim # 使用变量替代硬编码的18 for folder, dim in folder_dimensions.items(): actual_max_dim = max(actual_max_dim, dim) - + # Update observation.state and action dimensions - for feature_name in ['observation.state', 'action']: - if feature_name in info['features']: + for feature_name in ["observation.state", "action"]: + if feature_name in info["features"]: # Update shape to the maximum dimension - if 'shape' in info['features'][feature_name]: - info['features'][feature_name]['shape'] = [actual_max_dim] + if "shape" in info["features"][feature_name]: + info["features"][feature_name]["shape"] = [actual_max_dim] print(f"Updated {feature_name} shape to {actual_max_dim}") # 更新视频总数 (Update total videos) - info['total_videos'] = total_videos + info["total_videos"] = total_videos print(f"更新视频总数为: {total_videos} (Update total videos to: {total_videos})") - with open(os.path.join(output_folder, 'meta', 'info.json'), 'w') as f: + with open(os.path.join(output_folder, "meta", "info.json"), "w") as f: json.dump(info, f, indent=4) - + # Copy video and data files copy_videos(source_folders, output_folder, episode_mapping) - copy_data_files(source_folders, output_folder, episode_mapping, - max_dim=max_dim, fps=fps, - episode_to_frame_index=episode_to_frame_index, - folder_task_mapping=folder_task_mapping, - chunks_size=chunks_size) - + copy_data_files( + source_folders, + output_folder, + episode_mapping, + max_dim=max_dim, + fps=fps, + episode_to_frame_index=episode_to_frame_index, + folder_task_mapping=folder_task_mapping, + chunks_size=chunks_size, + ) + print(f"Merged {total_episodes} episodes with {total_frames} frames into {output_folder}") + if __name__ == "__main__": # Define source folders and output folder - source_folders = ["/path/to/put_plastic_to_box/", "/path/to/put_metal_to_box/","/path/to/Find_blue_box"] - + source_folders = ["/path/to/put_plastic_to_box/", "/path/to/put_metal_to_box/", "/path/to/Find_blue_box"] + output_folder = "/data1/realman/lerobot_merged_test/" - + # 设置默认FPS default_fps = 20 - + # Merge the datasets - merge_datasets(source_folders, output_folder, max_dim=32, default_fps=default_fps) + merge_datasets(source_folders, output_folder, max_dim=32, default_fps=default_fps) From 2e525a5a0ae5850a022cda092c72e2b743978422 Mon Sep 17 00:00:00 2001 From: zhipeng tang <2444198418@qq.com> Date: Wed, 2 Apr 2025 14:28:55 +0800 Subject: [PATCH 4/9] Update merge.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit fixes 8 linter warnings in the merge.py file, including: 1.Added contextlib import and used contextlib.suppress instead of the try-except-pass pattern 2.Removed unnecessary .keys() calls, using Pythonic way to iterate dictionaries directly 3.Renamed unused loop variables with underscore prefix (idx → _idx, dirs → _dirs, folder → _folder) 4. Combined nested if statements to improve code conciseness These changes maintain the same functionality while improving code quality and readability to conform to the project's coding standards. --- lerobot/scripts/merge.py | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/lerobot/scripts/merge.py b/lerobot/scripts/merge.py index 6763ecf8..4e7ab3ca 100644 --- a/lerobot/scripts/merge.py +++ b/lerobot/scripts/merge.py @@ -2,6 +2,7 @@ import json import os import shutil import traceback +import contextlib import numpy as np import pandas as pd @@ -43,10 +44,8 @@ def load_jsonl(file_path): with open(file_path) as f: for line in f: if line.strip(): - try: + with contextlib.suppress(json.JSONDecodeError): data.append(json.loads(line)) - except json.JSONDecodeError: - pass except Exception as e: print(f"Error loading {file_path} line by line: {e}") else: @@ -54,10 +53,8 @@ def load_jsonl(file_path): with open(file_path) as f: for line in f: if line.strip(): - try: + with contextlib.suppress(json.JSONDecodeError): data.append(json.loads(line)) - except json.JSONDecodeError: - print(f"Warning: Could not parse line in {file_path}: {line[:100]}...") return data @@ -97,7 +94,7 @@ def merge_stats(stats_list): common_features = common_features.intersection(set(stats.keys())) # Process features in the order they appear in the first stats file - for feature in stats_list[0].keys(): + for feature in stats_list[0]: if feature not in common_features: continue @@ -606,7 +603,7 @@ def copy_data_files( for feature in ["observation.state", "action"]: if feature in df.columns: # 检查第一个非空值 (Check first non-null value) - for idx, value in enumerate(df[feature]): + for _idx, value in enumerate(df[feature]): if value is not None and isinstance(value, (list, np.ndarray)): current_dim = len(value) if current_dim < max_dim: @@ -704,7 +701,7 @@ def copy_data_files( for feature in ["observation.state", "action"]: if feature in df.columns: # 检查第一个非空值 (Check first non-null value) - for idx, value in enumerate(df[feature]): + for _idx, value in enumerate(df[feature]): if value is not None and isinstance(value, (list, np.ndarray)): current_dim = len(value) if current_dim < max_dim: @@ -997,7 +994,7 @@ def merge_datasets( folder_dim = max_dim # 使用变量替代硬编码的18 # Try to find a parquet file to determine dimensions - for root, dirs, files in os.walk(folder): + for root, _dirs, files in os.walk(folder): for file in files: if file.endswith(".parquet"): try: @@ -1141,7 +1138,7 @@ def merge_datasets( # Update merged stats with episode-specific stats if available if all_stats_data: # For each feature in the stats - for feature in merged_stats.keys(): + for feature in merged_stats: if feature in all_stats_data[0]: # Recalculate statistics based on all episodes values = [stat[feature] for stat in all_stats_data if feature in stat] @@ -1262,16 +1259,14 @@ def merge_datasets( if "features" in info: # Find the maximum dimension across all folders actual_max_dim = max_dim # 使用变量替代硬编码的18 - for folder, dim in folder_dimensions.items(): + for _folder, dim in folder_dimensions.items(): actual_max_dim = max(actual_max_dim, dim) # Update observation.state and action dimensions for feature_name in ["observation.state", "action"]: - if feature_name in info["features"]: - # Update shape to the maximum dimension - if "shape" in info["features"][feature_name]: - info["features"][feature_name]["shape"] = [actual_max_dim] - print(f"Updated {feature_name} shape to {actual_max_dim}") + if feature_name in info["features"] and "shape" in info["features"][feature_name]: + info["features"][feature_name]["shape"] = [actual_max_dim] + print(f"Updated {feature_name} shape to {actual_max_dim}") # 更新视频总数 (Update total videos) info["total_videos"] = total_videos From 1974bc77057e6e4e7a75a9e146eee00cba4ad359 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 2 Apr 2025 06:29:11 +0000 Subject: [PATCH 5/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- lerobot/scripts/merge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lerobot/scripts/merge.py b/lerobot/scripts/merge.py index 4e7ab3ca..c339eb89 100644 --- a/lerobot/scripts/merge.py +++ b/lerobot/scripts/merge.py @@ -1,8 +1,8 @@ +import contextlib import json import os import shutil import traceback -import contextlib import numpy as np import pandas as pd From 60359dfff480f384a2fade1a016479ab5cce1855 Mon Sep 17 00:00:00 2001 From: zhipeng tang <2444198418@qq.com> Date: Thu, 10 Apr 2025 12:23:35 +0800 Subject: [PATCH 6/9] Update merge.py Modifications Introduced the argparse module to parse command-line arguments Converted previously hardcoded source folder paths, output folder path, max_dim, and FPS values to configurable command-line parameters Important Note Note: When merging datasets, the FPS for each collected task must be identical to ensure data synchronization and consistency --- lerobot/scripts/merge.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/lerobot/scripts/merge.py b/lerobot/scripts/merge.py index c339eb89..61bab7f8 100644 --- a/lerobot/scripts/merge.py +++ b/lerobot/scripts/merge.py @@ -1292,13 +1292,21 @@ def merge_datasets( if __name__ == "__main__": - # Define source folders and output folder - source_folders = ["/path/to/put_plastic_to_box/", "/path/to/put_metal_to_box/", "/path/to/Find_blue_box"] - - output_folder = "/data1/realman/lerobot_merged_test/" - - # 设置默认FPS - default_fps = 20 - - # Merge the datasets - merge_datasets(source_folders, output_folder, max_dim=32, default_fps=default_fps) + # Set up argument parser + parser = argparse.ArgumentParser(description='Merge datasets from multiple sources.') + + # Add arguments + parser.add_argument('--sources', nargs='+', required=True, + help='List of source folder paths') + parser.add_argument('--output', required=True, + help='Output folder path') + parser.add_argument('--max_dim', type=int, default=32, + help='Maximum dimension (default: 32)') + parser.add_argument('--fps', type=int, default=20, + help='Your datasets FPS (default: 20)') + + # Parse arguments + args = parser.parse_args() + + # Use parsed arguments + merge_datasets(args.sources, args.output, max_dim=args.max_dim, default_fps=args.fps) From 391355c9913d8239e6f317e1656954d2dcccec91 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 10 Apr 2025 04:23:50 +0000 Subject: [PATCH 7/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- lerobot/scripts/merge.py | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/lerobot/scripts/merge.py b/lerobot/scripts/merge.py index 61bab7f8..8cf0dffd 100644 --- a/lerobot/scripts/merge.py +++ b/lerobot/scripts/merge.py @@ -1293,20 +1293,16 @@ def merge_datasets( if __name__ == "__main__": # Set up argument parser - parser = argparse.ArgumentParser(description='Merge datasets from multiple sources.') - + parser = argparse.ArgumentParser(description="Merge datasets from multiple sources.") + # Add arguments - parser.add_argument('--sources', nargs='+', required=True, - help='List of source folder paths') - parser.add_argument('--output', required=True, - help='Output folder path') - parser.add_argument('--max_dim', type=int, default=32, - help='Maximum dimension (default: 32)') - parser.add_argument('--fps', type=int, default=20, - help='Your datasets FPS (default: 20)') - + parser.add_argument("--sources", nargs="+", required=True, help="List of source folder paths") + parser.add_argument("--output", required=True, help="Output folder path") + parser.add_argument("--max_dim", type=int, default=32, help="Maximum dimension (default: 32)") + parser.add_argument("--fps", type=int, default=20, help="Your datasets FPS (default: 20)") + # Parse arguments args = parser.parse_args() - + # Use parsed arguments merge_datasets(args.sources, args.output, max_dim=args.max_dim, default_fps=args.fps) From 6b64bdf4b0a70da6bd21eca039006a5b4d9026a7 Mon Sep 17 00:00:00 2001 From: zhipeng tang <2444198418@qq.com> Date: Thu, 10 Apr 2025 12:25:32 +0800 Subject: [PATCH 8/9] Update merge.py --- lerobot/scripts/merge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lerobot/scripts/merge.py b/lerobot/scripts/merge.py index 8cf0dffd..f9761f6d 100644 --- a/lerobot/scripts/merge.py +++ b/lerobot/scripts/merge.py @@ -1,9 +1,9 @@ +import argparse import contextlib import json import os import shutil import traceback - import numpy as np import pandas as pd From c4bc5f5b3002bc86c181e51bce41b3358505371e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 10 Apr 2025 04:25:44 +0000 Subject: [PATCH 9/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- lerobot/scripts/merge.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lerobot/scripts/merge.py b/lerobot/scripts/merge.py index f9761f6d..660ea9c2 100644 --- a/lerobot/scripts/merge.py +++ b/lerobot/scripts/merge.py @@ -4,6 +4,7 @@ import json import os import shutil import traceback + import numpy as np import pandas as pd