fix
This commit is contained in:
parent
772927616a
commit
4843988d81
|
@ -18,8 +18,6 @@ Contains utilities to process raw data format from dora-record
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import re
|
|
||||||
import shutil
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
@ -35,7 +33,7 @@ from lerobot.common.utils.utils import init_logging
|
||||||
|
|
||||||
def check_format(raw_dir) -> bool:
|
def check_format(raw_dir) -> bool:
|
||||||
# TODO(rcadene): remove hardcoding
|
# TODO(rcadene): remove hardcoding
|
||||||
raw_dir = raw_dir / "018f9c37-c092-72fd-bd83-6f5a5c1b59d2"
|
raw_dir = raw_dir / "018f9fdc-6b7b-7432-a529-40d2cc718032"
|
||||||
assert raw_dir.exists()
|
assert raw_dir.exists()
|
||||||
|
|
||||||
leader_file = list(raw_dir.glob("*.parquet"))
|
leader_file = list(raw_dir.glob("*.parquet"))
|
||||||
|
@ -46,7 +44,7 @@ def check_format(raw_dir) -> bool:
|
||||||
|
|
||||||
def load_from_raw(raw_dir: Path, out_dir: Path):
|
def load_from_raw(raw_dir: Path, out_dir: Path):
|
||||||
# TODO(rcadene): remove hardcoding
|
# TODO(rcadene): remove hardcoding
|
||||||
raw_dir = raw_dir / "018f9c37-c092-72fd-bd83-6f5a5c1b59d2"
|
raw_dir = raw_dir / "018f9fdc-6b7b-7432-a529-40d2cc718032"
|
||||||
|
|
||||||
# Load data stream that will be used as reference for the timestamps synchronization
|
# Load data stream that will be used as reference for the timestamps synchronization
|
||||||
reference_key = "observation.images.cam_right_wrist"
|
reference_key = "observation.images.cam_right_wrist"
|
||||||
|
@ -54,82 +52,85 @@ def load_from_raw(raw_dir: Path, out_dir: Path):
|
||||||
reference_df = reference_df[["timestamp_utc", reference_key]]
|
reference_df = reference_df[["timestamp_utc", reference_key]]
|
||||||
|
|
||||||
# Merge all data stream using nearest backward strategy
|
# Merge all data stream using nearest backward strategy
|
||||||
data_df = reference_df
|
df = reference_df
|
||||||
for path in raw_dir.glob("*.parquet"):
|
for path in raw_dir.glob("*.parquet"):
|
||||||
key = path.stem # action or observation.state or ...
|
key = path.stem # action or observation.state or ...
|
||||||
if key == reference_key:
|
if key == reference_key:
|
||||||
continue
|
continue
|
||||||
df = pd.read_parquet(path)
|
modality_df = pd.read_parquet(path)
|
||||||
df = df[["timestamp_utc", key]]
|
modality_df = modality_df[["timestamp_utc", key]]
|
||||||
data_df = pd.merge_asof(
|
df = pd.merge_asof(
|
||||||
data_df,
|
|
||||||
df,
|
df,
|
||||||
|
modality_df,
|
||||||
on="timestamp_utc",
|
on="timestamp_utc",
|
||||||
direction="backward",
|
direction="backward",
|
||||||
)
|
)
|
||||||
# dora only use arrays, so single values are encapsulated into a list
|
|
||||||
data_df["episode_index"] = data_df["episode_index"].map(lambda x: x[0])
|
|
||||||
data_df["frame_index"] = data_df.groupby("episode_index").cumcount()
|
|
||||||
data_df["index"] = data_df.index
|
|
||||||
|
|
||||||
# set 'next.done' to True for the last frame of each episode
|
|
||||||
data_df["next.done"] = False
|
|
||||||
data_df.loc[data_df.groupby("episode_index").tail(1).index, "next.done"] = True
|
|
||||||
|
|
||||||
data_df["timestamp"] = data_df["timestamp_utc"].map(lambda x: x.timestamp())
|
|
||||||
# each episode starts with timestamp 0 to match the ones from the video
|
|
||||||
data_df["timestamp"] = data_df.groupby("episode_index")["timestamp"].transform(lambda x: x - x.iloc[0])
|
|
||||||
|
|
||||||
del data_df["timestamp_utc"]
|
|
||||||
|
|
||||||
# Remove rows with a NaN in any column. It can happened during the first frames of an episode,
|
# Remove rows with a NaN in any column. It can happened during the first frames of an episode,
|
||||||
# because some cameras didnt start recording yet.
|
# because some cameras didnt start recording yet.
|
||||||
data_df = data_df.dropna(axis=0)
|
df = df.dropna(axis=0)
|
||||||
|
|
||||||
|
# Remove rows with episode_index -1 which indicates a failed episode
|
||||||
|
df = df[df["episode_index"] != -1]
|
||||||
|
|
||||||
|
# dora only use arrays, so single values are encapsulated into a list
|
||||||
|
df["episode_index"] = df["episode_index"].map(lambda x: x[0])
|
||||||
|
df["frame_index"] = df.groupby("episode_index").cumcount()
|
||||||
|
df = df.reset_index()
|
||||||
|
df["index"] = df.index
|
||||||
|
|
||||||
|
# set 'next.done' to True for the last frame of each episode
|
||||||
|
df["next.done"] = False
|
||||||
|
df.loc[df.groupby("episode_index").tail(1).index, "next.done"] = True
|
||||||
|
|
||||||
|
df["timestamp"] = df["timestamp_utc"].map(lambda x: x.timestamp())
|
||||||
|
# each episode starts with timestamp 0 to match the ones from the video
|
||||||
|
df["timestamp"] = df.groupby("episode_index")["timestamp"].transform(lambda x: x - x.iloc[0])
|
||||||
|
|
||||||
|
del df["timestamp_utc"]
|
||||||
|
|
||||||
|
# sanity check episode indices go from 0 to n-1
|
||||||
|
ep_ids = [ep_idx for ep_idx, _ in df.groupby("episode_index")]
|
||||||
|
expected_ep_ids = list(range(df["episode_index"].max()))
|
||||||
|
assert ep_ids == expected_ep_ids, f"Episodes indices go from {ep_ids} instead of {expected_ep_ids}"
|
||||||
|
|
||||||
# Create symlink to raw videos directory (that needs to be absolute not relative)
|
# Create symlink to raw videos directory (that needs to be absolute not relative)
|
||||||
# out_dir.mkdir(parents=True, exist_ok=True)
|
out_dir.mkdir(parents=True, exist_ok=True)
|
||||||
# absolute_videos_dir = (raw_dir / "videos").absolute()
|
videos_dir = out_dir / "videos"
|
||||||
# (out_dir / "videos").symlink_to(absolute_videos_dir)
|
videos_dir.symlink_to((raw_dir / "videos").absolute())
|
||||||
|
|
||||||
# TODO(rcadene): remove before merge
|
# sanity check the video paths are well formated
|
||||||
(out_dir / "videos").mkdir(parents=True, exist_ok=True)
|
for key in df:
|
||||||
for from_path in (raw_dir / "videos").glob("*.mp4"):
|
if "observation.images." not in key:
|
||||||
match = re.search(r"_(\d+)\.mp4$", from_path.name)
|
continue
|
||||||
if not match:
|
for ep_idx in ep_ids:
|
||||||
raise ValueError(from_path.name)
|
video_path = videos_dir / f"{key}_episode_{ep_idx:06d}.mp4"
|
||||||
ep_idx = match.group(1)
|
assert video_path.exists(), f"Video file not found in {video_path}"
|
||||||
to_path = out_dir / "videos" / from_path.name.replace(ep_idx, f"{int(ep_idx):06d}")
|
|
||||||
shutil.copy2(from_path, to_path)
|
|
||||||
|
|
||||||
data_dict = {}
|
data_dict = {}
|
||||||
for key in data_df:
|
for key in df:
|
||||||
# is video frame
|
# is video frame
|
||||||
if "observation.images." in key:
|
if "observation.images." in key:
|
||||||
# we need `[0] because dora only use arrays, so single values are encapsulated into a list.
|
# we need `[0] because dora only use arrays, so single values are encapsulated into a list.
|
||||||
# it is the case for video_frame dictionary = [{"path": ..., "timestamp": ...}]
|
# it is the case for video_frame dictionary = [{"path": ..., "timestamp": ...}]
|
||||||
data_dict[key] = [video_frame[0] for video_frame in data_df[key].values]
|
data_dict[key] = [video_frame[0] for video_frame in df[key].values]
|
||||||
|
|
||||||
# TODO(rcadene): remove before merge
|
# sanity check the video path is well formated
|
||||||
for item in data_dict[key]:
|
video_path = videos_dir.parent / data_dict[key][0]["path"]
|
||||||
path = item["path"]
|
assert video_path.exists(), f"Video file not found in {video_path}"
|
||||||
match = re.search(r"_(\d+)\.mp4$", path)
|
|
||||||
if not match:
|
|
||||||
raise ValueError(path)
|
|
||||||
ep_idx = match.group(1)
|
|
||||||
item["path"] = path.replace(ep_idx, f"{int(ep_idx):06d}")
|
|
||||||
# is number
|
# is number
|
||||||
elif data_df[key].iloc[0].ndim == 0 or data_df[key].iloc[0].shape[0] == 1:
|
elif df[key].iloc[0].ndim == 0 or df[key].iloc[0].shape[0] == 1:
|
||||||
data_dict[key] = torch.from_numpy(data_df[key].values)
|
data_dict[key] = torch.from_numpy(df[key].values)
|
||||||
# is vector
|
# is vector
|
||||||
elif data_df[key].iloc[0].shape[0] > 1:
|
elif df[key].iloc[0].shape[0] > 1:
|
||||||
data_dict[key] = torch.stack([torch.from_numpy(x.copy()) for x in data_df[key].values])
|
data_dict[key] = torch.stack([torch.from_numpy(x.copy()) for x in df[key].values])
|
||||||
else:
|
else:
|
||||||
raise ValueError(key)
|
raise ValueError(key)
|
||||||
|
|
||||||
# Get the episode index containing for each unique episode index
|
# Get the episode index containing for each unique episode index
|
||||||
first_ep_index_df = data_df.groupby("episode_index").agg(start_index=("index", "first")).reset_index()
|
first_ep_index_df = df.groupby("episode_index").agg(start_index=("index", "first")).reset_index()
|
||||||
from_ = first_ep_index_df["start_index"].tolist()
|
from_ = first_ep_index_df["start_index"].tolist()
|
||||||
to_ = from_[1:] + [len(data_df)]
|
to_ = from_[1:] + [len(df)]
|
||||||
episode_data_index = {
|
episode_data_index = {
|
||||||
"from": from_,
|
"from": from_,
|
||||||
"to": to_,
|
"to": to_,
|
||||||
|
|
Loading…
Reference in New Issue