From 4843988d81e0ab766a65a30d62bdce4aba9973e7 Mon Sep 17 00:00:00 2001 From: Remi Cadene Date: Wed, 22 May 2024 15:15:13 +0000 Subject: [PATCH] fix --- .../push_dataset_to_hub/aloha_dora_format.py | 105 +++++++++--------- 1 file changed, 53 insertions(+), 52 deletions(-) diff --git a/lerobot/common/datasets/push_dataset_to_hub/aloha_dora_format.py b/lerobot/common/datasets/push_dataset_to_hub/aloha_dora_format.py index b7f9f870..9db26c7c 100644 --- a/lerobot/common/datasets/push_dataset_to_hub/aloha_dora_format.py +++ b/lerobot/common/datasets/push_dataset_to_hub/aloha_dora_format.py @@ -18,8 +18,6 @@ Contains utilities to process raw data format from dora-record """ import logging -import re -import shutil from pathlib import Path import pandas as pd @@ -35,7 +33,7 @@ from lerobot.common.utils.utils import init_logging def check_format(raw_dir) -> bool: # TODO(rcadene): remove hardcoding - raw_dir = raw_dir / "018f9c37-c092-72fd-bd83-6f5a5c1b59d2" + raw_dir = raw_dir / "018f9fdc-6b7b-7432-a529-40d2cc718032" assert raw_dir.exists() leader_file = list(raw_dir.glob("*.parquet")) @@ -46,7 +44,7 @@ def check_format(raw_dir) -> bool: def load_from_raw(raw_dir: Path, out_dir: Path): # TODO(rcadene): remove hardcoding - raw_dir = raw_dir / "018f9c37-c092-72fd-bd83-6f5a5c1b59d2" + raw_dir = raw_dir / "018f9fdc-6b7b-7432-a529-40d2cc718032" # Load data stream that will be used as reference for the timestamps synchronization reference_key = "observation.images.cam_right_wrist" @@ -54,82 +52,85 @@ def load_from_raw(raw_dir: Path, out_dir: Path): reference_df = reference_df[["timestamp_utc", reference_key]] # Merge all data stream using nearest backward strategy - data_df = reference_df + df = reference_df for path in raw_dir.glob("*.parquet"): key = path.stem # action or observation.state or ... if key == reference_key: continue - df = pd.read_parquet(path) - df = df[["timestamp_utc", key]] - data_df = pd.merge_asof( - data_df, + modality_df = pd.read_parquet(path) + modality_df = modality_df[["timestamp_utc", key]] + df = pd.merge_asof( df, + modality_df, on="timestamp_utc", direction="backward", ) - # dora only use arrays, so single values are encapsulated into a list - data_df["episode_index"] = data_df["episode_index"].map(lambda x: x[0]) - data_df["frame_index"] = data_df.groupby("episode_index").cumcount() - data_df["index"] = data_df.index - - # set 'next.done' to True for the last frame of each episode - data_df["next.done"] = False - data_df.loc[data_df.groupby("episode_index").tail(1).index, "next.done"] = True - - data_df["timestamp"] = data_df["timestamp_utc"].map(lambda x: x.timestamp()) - # each episode starts with timestamp 0 to match the ones from the video - data_df["timestamp"] = data_df.groupby("episode_index")["timestamp"].transform(lambda x: x - x.iloc[0]) - - del data_df["timestamp_utc"] # Remove rows with a NaN in any column. It can happened during the first frames of an episode, # because some cameras didnt start recording yet. - data_df = data_df.dropna(axis=0) + df = df.dropna(axis=0) + + # Remove rows with episode_index -1 which indicates a failed episode + df = df[df["episode_index"] != -1] + + # dora only use arrays, so single values are encapsulated into a list + df["episode_index"] = df["episode_index"].map(lambda x: x[0]) + df["frame_index"] = df.groupby("episode_index").cumcount() + df = df.reset_index() + df["index"] = df.index + + # set 'next.done' to True for the last frame of each episode + df["next.done"] = False + df.loc[df.groupby("episode_index").tail(1).index, "next.done"] = True + + df["timestamp"] = df["timestamp_utc"].map(lambda x: x.timestamp()) + # each episode starts with timestamp 0 to match the ones from the video + df["timestamp"] = df.groupby("episode_index")["timestamp"].transform(lambda x: x - x.iloc[0]) + + del df["timestamp_utc"] + + # sanity check episode indices go from 0 to n-1 + ep_ids = [ep_idx for ep_idx, _ in df.groupby("episode_index")] + expected_ep_ids = list(range(df["episode_index"].max())) + assert ep_ids == expected_ep_ids, f"Episodes indices go from {ep_ids} instead of {expected_ep_ids}" # Create symlink to raw videos directory (that needs to be absolute not relative) - # out_dir.mkdir(parents=True, exist_ok=True) - # absolute_videos_dir = (raw_dir / "videos").absolute() - # (out_dir / "videos").symlink_to(absolute_videos_dir) + out_dir.mkdir(parents=True, exist_ok=True) + videos_dir = out_dir / "videos" + videos_dir.symlink_to((raw_dir / "videos").absolute()) - # TODO(rcadene): remove before merge - (out_dir / "videos").mkdir(parents=True, exist_ok=True) - for from_path in (raw_dir / "videos").glob("*.mp4"): - match = re.search(r"_(\d+)\.mp4$", from_path.name) - if not match: - raise ValueError(from_path.name) - ep_idx = match.group(1) - to_path = out_dir / "videos" / from_path.name.replace(ep_idx, f"{int(ep_idx):06d}") - shutil.copy2(from_path, to_path) + # sanity check the video paths are well formated + for key in df: + if "observation.images." not in key: + continue + for ep_idx in ep_ids: + video_path = videos_dir / f"{key}_episode_{ep_idx:06d}.mp4" + assert video_path.exists(), f"Video file not found in {video_path}" data_dict = {} - for key in data_df: + for key in df: # is video frame if "observation.images." in key: # we need `[0] because dora only use arrays, so single values are encapsulated into a list. # it is the case for video_frame dictionary = [{"path": ..., "timestamp": ...}] - data_dict[key] = [video_frame[0] for video_frame in data_df[key].values] + data_dict[key] = [video_frame[0] for video_frame in df[key].values] - # TODO(rcadene): remove before merge - for item in data_dict[key]: - path = item["path"] - match = re.search(r"_(\d+)\.mp4$", path) - if not match: - raise ValueError(path) - ep_idx = match.group(1) - item["path"] = path.replace(ep_idx, f"{int(ep_idx):06d}") + # sanity check the video path is well formated + video_path = videos_dir.parent / data_dict[key][0]["path"] + assert video_path.exists(), f"Video file not found in {video_path}" # is number - elif data_df[key].iloc[0].ndim == 0 or data_df[key].iloc[0].shape[0] == 1: - data_dict[key] = torch.from_numpy(data_df[key].values) + elif df[key].iloc[0].ndim == 0 or df[key].iloc[0].shape[0] == 1: + data_dict[key] = torch.from_numpy(df[key].values) # is vector - elif data_df[key].iloc[0].shape[0] > 1: - data_dict[key] = torch.stack([torch.from_numpy(x.copy()) for x in data_df[key].values]) + elif df[key].iloc[0].shape[0] > 1: + data_dict[key] = torch.stack([torch.from_numpy(x.copy()) for x in df[key].values]) else: raise ValueError(key) # Get the episode index containing for each unique episode index - first_ep_index_df = data_df.groupby("episode_index").agg(start_index=("index", "first")).reset_index() + first_ep_index_df = df.groupby("episode_index").agg(start_index=("index", "first")).reset_index() from_ = first_ep_index_df["start_index"].tolist() - to_ = from_[1:] + [len(data_df)] + to_ = from_[1:] + [len(df)] episode_data_index = { "from": from_, "to": to_,