delete go1_gym_deploy folder

This commit is contained in:
Liao Dengting 2024-03-07 15:07:08 +08:00
parent 34339ef68e
commit c3a9bf7a3e
37 changed files with 0 additions and 2646 deletions

View File

@ -1,5 +0,0 @@
#!/bin/bash
sudo docker stop foxy_controller || true
sudo docker rm foxy_controller || true
cd ~/go1_gym/go1_gym_deploy/docker/
sudo make autostart

View File

@ -1,6 +0,0 @@
#!/bin/bash
sudo docker stop foxy_controller || true
sudo docker rm foxy_controller || true
sudo kill $(ps aux |grep lcm_position | awk '{print $2}')
cd ~/go1_gym/go1_gym_deploy/unitree_legged_sdk_bin/
yes "" | sudo ./lcm_position &

View File

@ -1,190 +0,0 @@
# syntax=docker/dockerfile:experimental
FROM nvcr.io/nvidia/l4t-pytorch:r32.6.1-pth1.9-py3
#ENV NVIDIA_VISIBLE_DEVICES ${NVIDIA_VISIBLE_DEVICES:-all}
#ENV NVIDIA_DRIVER_CAPABILITIES ${NVIDIA_DRIVER_CAPABILITIES:+$NVIDIA_DRIVER_CAPABILITIES,}graphics
# add new sudo user
ENV USERNAME improbable
ENV HOME /home/$USERNAME
RUN useradd -m $USERNAME && \
echo "$USERNAME:$USERNAME" | chpasswd && \
usermod --shell /bin/bash $USERNAME && \
usermod -aG sudo $USERNAME && \
mkdir /etc/sudoers.d && \
echo "$USERNAME ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers.d/$USERNAME && \
chmod 0440 /etc/sudoers.d/$USERNAME && \
# Replace 1000 with your user/group id
usermod --uid 1000 $USERNAME && \
groupmod --gid 1000 $USERNAME
# install package
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
curl \
sudo \
less \
emacs \
apt-utils \
tzdata \
git \
tmux \
bash-completion \
command-not-found \
libglib2.0-0 \
gstreamer1.0-plugins-* \
libgstreamer1.0-* \
libgstreamer-plugins-*1.0-* \
&& \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
RUN echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selections
#COPY config/nvidia_icd.json /usr/share/vulkan/icd.d/
USER root
#RUN apt-get update && apt-get install -y python3-pip && pip3 install torch==1.9.0+cu111 torchvision==0.10.0+cu111 torchaudio==0.9.0 -f https://download.pytorch.org/whl/torch_stable.html
# ==================================================================
# Useful Libraries for Development
# ------------------------------------------------------------------
#RUN apt update && apt install -y apt-transport-https ca-certificates curl software-properties-common
#RUN curl -fsSL https://download.sublimetext.com/sublimehq-pub.gpg | apt-key add - && add-apt-repository "deb https://download.sublimetext.com/ apt/stable/" && apt update && apt install sublime-text
# ==================================================================
# Python dependencies defined in requirements.txt
# ------------------------------------------------------------------
#RUN pip3 install --upgrade pip
# copy local requirements file for pip install python deps
#COPY ./requirements.txt /home/$USERNAME
#WORKDIR /home/$USERNAME
#RUN pip3 install -r requirements.txt
# LCM
RUN apt-get -y update && apt-get install -y make gcc-8 g++-8
RUN cd /home/$USERNAME && git clone https://github.com/lcm-proj/lcm.git && cd lcm && mkdir build && cd build && cmake .. && make -j && make install
RUN cd /home/$USERNAME/lcm/lcm-python && pip3 install -e .
RUN apt-get install -y vim
#RUN pip3 install pandas
# ROS
# ENV ROS_DISTRO melodic
RUN apt-get install -y gnupg
# COPY install_scripts/install_ros.sh /tmp/install_ros.sh
# RUN chmod +x /tmp/install_ros.sh
# RUN /tmp/install_ros.sh
# # bootstrap rosdep
# RUN rosdep init \
# && rosdep update
# # create catkin workspace
# ENV CATKIN_WS=/root/catkin_ws
# RUN bash /opt/ros/melodic/setup.bash
# RUN mkdir -p $CATKIN_WS/src
# WORKDIR ${CATKIN_WS}
# RUN catkin init
# RUN catkin config --extend /opt/ros/$ROS_DISTRO \
# --cmake-args -DCMAKE_BUILD_TYPE=Release -DCATKIN_ENABLE_TESTING=False
# WORKDIR $CATKIN_WS/src
RUN apt-get update && apt-get install -y freeglut3-dev libudev-dev
#COPY ./install_scripts/install_vision_opencv.sh /tmp/install_vision_opencv.sh
#RUN chmod +x /tmp/install_vision_opencv.sh
#RUN /tmp/install_vision_opencv.sh
RUN apt-get install -y libgl1-mesa-dev libudev1 libudev-dev
#RUN apt-get install unzip
#
#RUN cd ~ && \
# wget -O opencv.zip https://github.com/opencv/opencv/archive/4.5.1.zip && \
# wget -O opencv_contrib.zip https://github.com/opencv/opencv_contrib/archive/4.5.1.zip && \
# unzip opencv.zip && \
# unzip opencv_contrib.zip && \
# mv opencv-4.5.1 opencv && \
# mv opencv_contrib-4.5.1 opencv_contrib && \
# rm opencv.zip && \
# rm opencv_contrib.zip
#
#RUN cd ~/opencv && \
# mkdir build && \
# cd build && \
# cmake -D CMAKE_BUILD_TYPE=RELEASE \
# -D CMAKE_INSTALL_PREFIX=/usr \
# -D OPENCV_EXTRA_MODULES_PATH=~/opencv_contrib/modules \
# -D EIGEN_INCLUDE_PATH=/usr/include/eigen3 \
# -D WITH_OPENCL=OFF \
# -D WITH_CUDA=OFF \
# -D CUDA_ARCH_BIN=5.3 \
# -D CUDA_ARCH_PTX="" \
# -D WITH_CUDNN=OFF \
# -D WITH_CUBLAS=OFF \
# -D ENABLE_FAST_MATH=ON \
# -D CUDA_FAST_MATH=OFF \
# -D OPENCV_DNN_CUDA=OFF \
# -D ENABLE_NEON=ON \
# -D WITH_QT=OFF \
# -D WITH_OPENMP=ON \
# -D WITH_OPENGL=ON \
# -D BUILD_TIFF=ON \
# -D WITH_FFMPEG=ON \
# -D WITH_GSTREAMER=ON \
# -D WITH_TBB=ON \
# -D BUILD_TBB=ON \
# -D BUILD_TESTS=OFF \
# -D WITH_EIGEN=ON \
# -D WITH_V4L=ON \
# -D WITH_LIBV4L=ON \
# -D OPENCV_ENABLE_NONFREE=ON \
# -D INSTALL_C_EXAMPLES=OFF \
# -D INSTALL_PYTHON_EXAMPLES=OFF \
# -D BUILD_NEW_PYTHON_SUPPORT=ON \
# -D BUILD_opencv_python3=TRUE \
# -D OPENCV_GENERATE_PKGCONFIG=ON \
# -D BUILD_EXAMPLES=OFF .. && \
# make -j4 && cd ~ && \
# # sudo rm -r /usr/include/opencv4/opencv2 && \
# cd ~/opencv/build && \
# sudo make install && \
# sudo ldconfig && \
# make clean && \
# sudo apt-get update
RUN apt-get install -y libgtk2.0-dev pkg-config
RUN pip3 install opencv-python opencv-contrib-python
####################################################################################
###### START HERE -- Install whatever dependencies you need specific to this project!
####################################################################################
#COPY ./rsc/IsaacGym_Preview_2_Package.tar.gz /home/$USERNAME/
#RUN cd /home/$USERNAME && tar -xvzf IsaacGym_Preview_2_Package.tar.gz
#COPY ./rsc/learning_to_walk_in_minutes.zip /home/$USERNAME/
#RUN apt-get install unzip && cd /home/$USERNAME/ && unzip learning_to_walk_in_minutes.zip && cd ./code/rl-pytorch && pip3 install -e .
#RUN cd /home/$USERNAME/isaacgym/python && pip3 install -e .
#RUN cd /home/$USERNAME/code/isaacgym_anymal && pip3 install -e .
#COPY ./src/isaacgym_anymal/ /home/$USERNAME/code/isaacgym_anymal/
# setup entrypoint
COPY entrypoint.sh /
ENTRYPOINT ["/entrypoint.sh"]
CMD ["bash"]

View File

@ -1,39 +0,0 @@
default: build
build:
docker build -t jetson-model-deployment .
clean-build:
docker build -t jetson-model-deployment . --no-cache=true
run:
docker stop foxy_controller || true
docker rm foxy_controller || true
docker run -it \
--env="DISPLAY" \
--env="QT_X11_NO_MITSHM=1" \
--volume="/tmp/.X11-unix:/tmp/.X11-unix:rw" \
--env="XAUTHORITY=${XAUTH}" \
--volume="${XAUTH}:${XAUTH}" \
--volume="/home/unitree/go1_gym:/home/isaac/go1_gym" \
--privileged \
--runtime=nvidia \
--net=host \
--workdir="/home/isaac/go1_gym" \
--name="foxy_controller" \
jetson-model-deployment bash
autostart:
docker stop foxy_controller || true
docker rm foxy_controller || true
docker run -d\
--env="DISPLAY" \
--env="QT_X11_NO_MITSHM=1" \
--volume="/tmp/.X11-unix:/tmp/.X11-unix:rw" \
--env="XAUTHORITY=${XAUTH}" \
--volume="${XAUTH}:${XAUTH}" \
--volume="/home/unitree/go1_gym:/home/isaac/go1_gym" \
--privileged \
--runtime=nvidia \
--net=host \
--workdir="/home/isaac/go1_gym" \
--name="foxy_controller" \
jetson-model-deployment tail -f /dev/null
docker start foxy_controller
docker exec foxy_controller bash -c 'cd /home/isaac/go1_gym/ && python3 setup.py install && cd go1_gym_deploy/scripts && ls && python3 deploy_policy.py'

View File

@ -1,2 +0,0 @@
#!/bin/bash
docker load -i deployment_image.tar

View File

@ -1,2 +0,0 @@
#!/bin/bash
docker save -o deployment_image.tar jetson-model-deployment:latest

View File

@ -1,51 +0,0 @@
# import isaacgym
# assert isaacgym, "import isaacgym before pytorch"
import torch
class HistoryWrapper:
def __init__(self, env):
self.env = env
if isinstance(self.env.cfg, dict):
self.obs_history_length = self.env.cfg["env"]["num_observation_history"]
else:
self.obs_history_length = self.env.cfg.env.num_observation_history
self.num_obs_history = self.obs_history_length * self.env.num_obs
self.obs_history = torch.zeros(self.env.num_envs, self.num_obs_history, dtype=torch.float,
device=self.env.device, requires_grad=False)
self.num_privileged_obs = self.env.num_privileged_obs
def step(self, action):
obs, rew, done, info = self.env.step(action)
privileged_obs = info["privileged_obs"]
self.obs_history = torch.cat((self.obs_history[:, self.env.num_obs:], obs), dim=-1)
return {'obs': obs, 'privileged_obs': privileged_obs, 'obs_history': self.obs_history}, rew, done, info
def get_observations(self):
obs = self.env.get_observations()
privileged_obs = self.env.get_privileged_observations()
self.obs_history = torch.cat((self.obs_history[:, self.env.num_obs:], obs), dim=-1)
return {'obs': obs, 'privileged_obs': privileged_obs, 'obs_history': self.obs_history}
def get_obs(self):
obs = self.env.get_obs()
privileged_obs = self.env.get_privileged_observations()
self.obs_history = torch.cat((self.obs_history[:, self.env.num_obs:], obs), dim=-1)
return {'obs': obs, 'privileged_obs': privileged_obs, 'obs_history': self.obs_history}
def reset_idx(self, env_ids): # it might be a problem that this isn't getting called!!
ret = self.env.reset_idx(env_ids)
self.obs_history[env_ids, :] = 0
return ret
def reset(self):
ret = self.env.reset()
privileged_obs = self.env.get_privileged_observations()
self.obs_history[:, :] = 0
return {"obs": ret, "privileged_obs": privileged_obs, "obs_history": self.obs_history}
def __getattr__(self, name):
return getattr(self.env, name)

View File

@ -1,301 +0,0 @@
import time
import lcm
import numpy as np
import torch
import cv2
from go1_gym_deploy.lcm_types.pd_tau_targets_lcmt import pd_tau_targets_lcmt
lc = lcm.LCM("udpm://239.255.76.67:7667?ttl=255")
def class_to_dict(obj) -> dict:
if not hasattr(obj, "__dict__"):
return obj
result = {}
for key in dir(obj):
if key.startswith("_") or key == "terrain":
continue
element = []
val = getattr(obj, key)
if isinstance(val, list):
for item in val:
element.append(class_to_dict(item))
else:
element = class_to_dict(val)
result[key] = element
return result
class LCMAgent():
def __init__(self, cfg, se, command_profile):
if not isinstance(cfg, dict):
cfg = class_to_dict(cfg)
self.cfg = cfg
self.se = se
self.command_profile = command_profile
self.dt = self.cfg["control"]["decimation"] * self.cfg["sim"]["dt"]
self.timestep = 0
self.num_obs = self.cfg["env"]["num_observations"]
self.num_envs = 1
self.num_privileged_obs = self.cfg["env"]["num_privileged_obs"]
self.num_actions = self.cfg["env"]["num_actions"]
self.num_commands = self.cfg["commands"]["num_commands"]
self.device = 'cpu'
if "obs_scales" in self.cfg.keys():
self.obs_scales = self.cfg["obs_scales"]
else:
self.obs_scales = self.cfg["normalization"]["obs_scales"]
self.commands_scale = np.array(
[self.obs_scales["lin_vel"], self.obs_scales["lin_vel"],
self.obs_scales["ang_vel"], self.obs_scales["body_height_cmd"], 1, 1, 1, 1, 1,
self.obs_scales["footswing_height_cmd"], self.obs_scales["body_pitch_cmd"],
# 0, self.obs_scales["body_pitch_cmd"],
self.obs_scales["body_roll_cmd"], self.obs_scales["stance_width_cmd"],
self.obs_scales["stance_length_cmd"], self.obs_scales["aux_reward_cmd"], 1, 1, 1, 1, 1, 1
])[:self.num_commands]
joint_names = [
"FL_hip_joint", "FL_thigh_joint", "FL_calf_joint",
"FR_hip_joint", "FR_thigh_joint", "FR_calf_joint",
"RL_hip_joint", "RL_thigh_joint", "RL_calf_joint",
"RR_hip_joint", "RR_thigh_joint", "RR_calf_joint", ]
self.default_dof_pos = np.array([self.cfg["init_state"]["default_joint_angles"][name] for name in joint_names])
try:
self.default_dof_pos_scale = np.array([self.cfg["init_state"]["default_hip_scales"], self.cfg["init_state"]["default_thigh_scales"], self.cfg["init_state"]["default_calf_scales"],
self.cfg["init_state"]["default_hip_scales"], self.cfg["init_state"]["default_thigh_scales"], self.cfg["init_state"]["default_calf_scales"],
self.cfg["init_state"]["default_hip_scales"], self.cfg["init_state"]["default_thigh_scales"], self.cfg["init_state"]["default_calf_scales"],
self.cfg["init_state"]["default_hip_scales"], self.cfg["init_state"]["default_thigh_scales"], self.cfg["init_state"]["default_calf_scales"]])
except KeyError:
self.default_dof_pos_scale = np.ones(12)
self.default_dof_pos = self.default_dof_pos * self.default_dof_pos_scale
self.p_gains = np.zeros(12)
self.d_gains = np.zeros(12)
for i in range(12):
joint_name = joint_names[i]
found = False
for dof_name in self.cfg["control"]["stiffness"].keys():
if dof_name in joint_name:
self.p_gains[i] = self.cfg["control"]["stiffness"][dof_name]
self.d_gains[i] = self.cfg["control"]["damping"][dof_name]
found = True
if not found:
self.p_gains[i] = 0.
self.d_gains[i] = 0.
if self.cfg["control"]["control_type"] in ["P", "V"]:
print(f"PD gain of joint {joint_name} were not defined, setting them to zero")
print(f"p_gains: {self.p_gains}")
self.commands = np.zeros((1, self.num_commands))
self.actions = torch.zeros(12)
self.last_actions = torch.zeros(12)
self.gravity_vector = np.zeros(3)
self.dof_pos = np.zeros(12)
self.dof_vel = np.zeros(12)
self.body_linear_vel = np.zeros(3)
self.body_angular_vel = np.zeros(3)
self.joint_pos_target = np.zeros(12)
self.joint_vel_target = np.zeros(12)
self.torques = np.zeros(12)
self.contact_state = np.ones(4)
self.joint_idxs = self.se.joint_idxs
self.gait_indices = torch.zeros(self.num_envs, dtype=torch.float)
self.clock_inputs = torch.zeros(self.num_envs, 4, dtype=torch.float)
if "obs_scales" in self.cfg.keys():
self.obs_scales = self.cfg["obs_scales"]
else:
self.obs_scales = self.cfg["normalization"]["obs_scales"]
self.is_currently_probing = False
def set_probing(self, is_currently_probing):
self.is_currently_probing = is_currently_probing
def get_obs(self):
self.gravity_vector = self.se.get_gravity_vector()
cmds, reset_timer = self.command_profile.get_command(self.timestep * self.dt, probe=self.is_currently_probing)
self.commands[:, :] = cmds[:self.num_commands]
if reset_timer:
self.reset_gait_indices()
#else:
# self.commands[:, 0:3] = self.command_profile.get_command(self.timestep * self.dt)[0:3]
self.dof_pos = self.se.get_dof_pos()
self.dof_vel = self.se.get_dof_vel()
self.body_linear_vel = self.se.get_body_linear_vel()
self.body_angular_vel = self.se.get_body_angular_vel()
ob = np.concatenate((self.gravity_vector.reshape(1, -1),
self.commands * self.commands_scale,
(self.dof_pos - self.default_dof_pos).reshape(1, -1) * self.obs_scales["dof_pos"],
self.dof_vel.reshape(1, -1) * self.obs_scales["dof_vel"],
torch.clip(self.actions, -self.cfg["normalization"]["clip_actions"],
self.cfg["normalization"]["clip_actions"]).cpu().detach().numpy().reshape(1, -1)
), axis=1)
if self.cfg["env"]["observe_two_prev_actions"]:
ob = np.concatenate((ob,
self.last_actions.cpu().detach().numpy().reshape(1, -1)), axis=1)
if self.cfg["env"]["observe_clock_inputs"]:
ob = np.concatenate((ob,
self.clock_inputs), axis=1)
# print(self.clock_inputs)
if self.cfg["env"]["observe_vel"]:
ob = np.concatenate(
(self.body_linear_vel.reshape(1, -1) * self.obs_scales["lin_vel"],
self.body_angular_vel.reshape(1, -1) * self.obs_scales["ang_vel"],
ob), axis=1)
if self.cfg["env"]["observe_only_lin_vel"]:
ob = np.concatenate(
(self.body_linear_vel.reshape(1, -1) * self.obs_scales["lin_vel"],
ob), axis=1)
if self.cfg["env"]["observe_yaw"]:
heading = self.se.get_yaw()
ob = np.concatenate((ob, heading.reshape(1, -1)), axis=-1)
self.contact_state = self.se.get_contact_state()
if "observe_contact_states" in self.cfg["env"].keys() and self.cfg["env"]["observe_contact_states"]:
ob = np.concatenate((ob, self.contact_state.reshape(1, -1)), axis=-1)
if "terrain" in self.cfg.keys() and self.cfg["terrain"]["measure_heights"]:
robot_height = 0.25
self.measured_heights = np.zeros(
(len(self.cfg["terrain"]["measured_points_x"]), len(self.cfg["terrain"]["measured_points_y"]))).reshape(
1, -1)
heights = np.clip(robot_height - 0.5 - self.measured_heights, -1, 1.) * self.obs_scales["height_measurements"]
ob = np.concatenate((ob, heights), axis=1)
return torch.tensor(ob, device=self.device).float()
def get_privileged_observations(self):
return None
def publish_action(self, action, hard_reset=False):
command_for_robot = pd_tau_targets_lcmt()
self.joint_pos_target = \
(action[0, :12].detach().cpu().numpy() * self.cfg["control"]["action_scale"]).flatten()
self.joint_pos_target[[0, 3, 6, 9]] *= self.cfg["control"]["hip_scale_reduction"]
# self.joint_pos_target[[0, 3, 6, 9]] *= -1
self.joint_pos_target = self.joint_pos_target
self.joint_pos_target += self.default_dof_pos
joint_pos_target = self.joint_pos_target[self.joint_idxs]
self.joint_vel_target = np.zeros(12)
# print(f'cjp {self.joint_pos_target}')
command_for_robot.q_des = joint_pos_target
command_for_robot.qd_des = self.joint_vel_target
command_for_robot.kp = self.p_gains
command_for_robot.kd = self.d_gains
command_for_robot.tau_ff = np.zeros(12)
command_for_robot.se_contactState = np.zeros(4)
command_for_robot.timestamp_us = int(time.time() * 10 ** 6)
command_for_robot.id = 0
if hard_reset:
command_for_robot.id = -1
self.torques = (self.joint_pos_target - self.dof_pos) * self.p_gains + (self.joint_vel_target - self.dof_vel) * self.d_gains
lc.publish("pd_plustau_targets", command_for_robot.encode())
def reset(self):
self.actions = torch.zeros(12)
self.time = time.time()
self.timestep = 0
return self.get_obs()
def reset_gait_indices(self):
self.gait_indices = torch.zeros(self.num_envs, dtype=torch.float)
def step(self, actions, hard_reset=False):
clip_actions = self.cfg["normalization"]["clip_actions"]
self.last_actions = self.actions[:]
self.actions = torch.clip(actions[0:1, :], -clip_actions, clip_actions)
self.publish_action(self.actions, hard_reset=hard_reset)
time.sleep(max(self.dt - (time.time() - self.time), 0))
if self.timestep % 100 == 0: print(f'frq: {1 / (time.time() - self.time)} Hz');
self.time = time.time()
obs = self.get_obs()
# clock accounting
frequencies = self.commands[:, 4]
phases = self.commands[:, 5]
offsets = self.commands[:, 6]
if self.num_commands == 8:
bounds = 0
durations = self.commands[:, 7]
else:
bounds = self.commands[:, 7]
durations = self.commands[:, 8]
self.gait_indices = torch.remainder(self.gait_indices + self.dt * frequencies, 1.0)
if "pacing_offset" in self.cfg["commands"] and self.cfg["commands"]["pacing_offset"]:
self.foot_indices = [self.gait_indices + phases + offsets + bounds,
self.gait_indices + bounds,
self.gait_indices + offsets,
self.gait_indices + phases]
else:
self.foot_indices = [self.gait_indices + phases + offsets + bounds,
self.gait_indices + offsets,
self.gait_indices + bounds,
self.gait_indices + phases]
self.clock_inputs[:, 0] = torch.sin(2 * np.pi * self.foot_indices[0])
self.clock_inputs[:, 1] = torch.sin(2 * np.pi * self.foot_indices[1])
self.clock_inputs[:, 2] = torch.sin(2 * np.pi * self.foot_indices[2])
self.clock_inputs[:, 3] = torch.sin(2 * np.pi * self.foot_indices[3])
images = {'front': self.se.get_camera_front(),
'bottom': self.se.get_camera_bottom(),
'rear': self.se.get_camera_rear(),
'left': self.se.get_camera_left(),
'right': self.se.get_camera_right()
}
downscale_factor = 2
temporal_downscale = 3
for k, v in images.items():
if images[k] is not None:
images[k] = cv2.resize(images[k], dsize=(images[k].shape[0]//downscale_factor, images[k].shape[1]//downscale_factor), interpolation=cv2.INTER_CUBIC)
if self.timestep % temporal_downscale != 0:
images[k] = None
#print(self.commands)
infos = {"joint_pos": self.dof_pos[np.newaxis, :],
"joint_vel": self.dof_vel[np.newaxis, :],
"joint_pos_target": self.joint_pos_target[np.newaxis, :],
"joint_vel_target": self.joint_vel_target[np.newaxis, :],
"body_linear_vel": self.body_linear_vel[np.newaxis, :],
"body_angular_vel": self.body_angular_vel[np.newaxis, :],
"contact_state": self.contact_state[np.newaxis, :],
"clock_inputs": self.clock_inputs[np.newaxis, :],
"body_linear_vel_cmd": self.commands[:, 0:2],
"body_angular_vel_cmd": self.commands[:, 2:],
"privileged_obs": None,
"camera_image_front": images['front'],
"camera_image_bottom": images['bottom'],
"camera_image_rear": images['rear'],
"camera_image_left": images['left'],
"camera_image_right": images['right'],
}
self.timestep += 1
return obs, None, None, infos

View File

@ -1,22 +0,0 @@
#!/bin/bash
echo "======================================"
echo "== Go1 Sim-to-Real Installation Kit =="
echo "======================================"
echo ""
echo "Author: Gabriel Margolis, Improbable AI Lab, MIT"
echo "This software is intended to support controls research. It includes safety features but may still damage your Go1. The user assumes all risk."
echo ""
read -r -p "Extract docker container? [y/N] " response
if [[ "$response" =~ ^([yY][eE][sS]|[yY])$ ]]
then
# load docker image
echo "[Step 1] Extracting docker image..."
docker load -i ../scripts/deployment_image.tar
printf "\nDone!\n"
else
echo "Quitting"
fi

View File

@ -1,60 +0,0 @@
"""LCM type definitions
This file automatically generated by lcm.
DO NOT MODIFY BY HAND!!!!
"""
try:
import cStringIO.StringIO as BytesIO
except ImportError:
from io import BytesIO
import struct
class camera_message_lcmt(object):
__slots__ = ["data"]
__typenames__ = ["byte"]
__dimensions__ = [[278400]]
def __init__(self):
self.data = ""
def encode(self):
buf = BytesIO()
buf.write(camera_message_lcmt._get_packed_fingerprint())
self._encode_one(buf)
return buf.getvalue()
def _encode_one(self, buf):
buf.write(bytearray(self.data[:278400]))
def decode(data):
if hasattr(data, 'read'):
buf = data
else:
buf = BytesIO(data)
if buf.read(8) != camera_message_lcmt._get_packed_fingerprint():
raise ValueError("Decode error")
return camera_message_lcmt._decode_one(buf)
decode = staticmethod(decode)
def _decode_one(buf):
self = camera_message_lcmt()
self.data = buf.read(278400)
return self
_decode_one = staticmethod(_decode_one)
_hash = None
def _get_hash_recursive(parents):
if camera_message_lcmt in parents: return 0
tmphash = (0x1610a8a9f4d174b7) & 0xffffffffffffffff
tmphash = (((tmphash<<1)&0xffffffffffffffff) + (tmphash>>63)) & 0xffffffffffffffff
return tmphash
_get_hash_recursive = staticmethod(_get_hash_recursive)
_packed_fingerprint = None
def _get_packed_fingerprint():
if camera_message_lcmt._packed_fingerprint is None:
camera_message_lcmt._packed_fingerprint = struct.pack(">Q", camera_message_lcmt._get_hash_recursive([]))
return camera_message_lcmt._packed_fingerprint
_get_packed_fingerprint = staticmethod(_get_packed_fingerprint)

View File

@ -1,57 +0,0 @@
"""LCM type definitions
This file automatically generated by lcm.
DO NOT MODIFY BY HAND!!!!
"""
try:
import cStringIO.StringIO as BytesIO
except ImportError:
from io import BytesIO
import struct
class camera_message_rect_wide(object):
__slots__ = ["data"]
def __init__(self):
self.data = ""
def encode(self):
buf = BytesIO()
buf.write(camera_message_rect_wide._get_packed_fingerprint())
self._encode_one(buf)
return buf.getvalue()
def _encode_one(self, buf):
buf.write(bytearray(self.data[:34800]))
def decode(data):
if hasattr(data, 'read'):
buf = data
else:
buf = BytesIO(data)
if buf.read(8) != camera_message_rect_wide._get_packed_fingerprint():
raise ValueError("Decode error")
return camera_message_rect_wide._decode_one(buf)
decode = staticmethod(decode)
def _decode_one(buf):
self = camera_message_rect_wide()
self.data = buf.read(34800)
return self
_decode_one = staticmethod(_decode_one)
_hash = None
def _get_hash_recursive(parents):
if camera_message_rect_wide in parents: return 0
tmphash = (0xc3e9f058530b2a8b) & 0xffffffffffffffff
tmphash = (((tmphash<<1)&0xffffffffffffffff) + (tmphash>>63)) & 0xffffffffffffffff
return tmphash
_get_hash_recursive = staticmethod(_get_hash_recursive)
_packed_fingerprint = None
def _get_packed_fingerprint():
if camera_message_rect_wide._packed_fingerprint is None:
camera_message_rect_wide._packed_fingerprint = struct.pack(">Q", camera_message_rect_wide._get_hash_recursive([]))
return camera_message_rect_wide._packed_fingerprint
_get_packed_fingerprint = staticmethod(_get_packed_fingerprint)

View File

@ -1,11 +0,0 @@
struct leg_control_data_lcmt
{
float q[12];
float qd[12];
float p[12];
float v[12];
float tau_est[12];
int64_t timestamp_us;
int64_t id;
int64_t robot_id;
}

View File

@ -1,85 +0,0 @@
"""LCM type definitions
This file automatically generated by lcm.
DO NOT MODIFY BY HAND!!!!
"""
try:
import cStringIO.StringIO as BytesIO
except ImportError:
from io import BytesIO
import struct
class leg_control_data_lcmt(object):
__slots__ = ["q", "qd", "p", "v", "tau_est", "timestamp_us", "id", "robot_id"]
__typenames__ = ["float", "float", "float", "float", "float", "int64_t", "int64_t", "int64_t"]
__dimensions__ = [[12], [12], [12], [12], [12], None, None, None]
def __init__(self):
self.q = [0.0 for dim0 in range(12)]
self.qd = [0.0 for dim0 in range(12)]
self.p = [0.0 for dim0 in range(12)]
self.v = [0.0 for dim0 in range(12)]
self.tau_est = [0.0 for dim0 in range(12)]
self.timestamp_us = 0
self.id = 0
self.robot_id = 0
def encode(self):
buf = BytesIO()
buf.write(leg_control_data_lcmt._get_packed_fingerprint())
self._encode_one(buf)
return buf.getvalue()
def _encode_one(self, buf):
buf.write(struct.pack('>12f', *self.q[:12]))
buf.write(struct.pack('>12f', *self.qd[:12]))
buf.write(struct.pack('>12f', *self.p[:12]))
buf.write(struct.pack('>12f', *self.v[:12]))
buf.write(struct.pack('>12f', *self.tau_est[:12]))
buf.write(struct.pack(">qqq", self.timestamp_us, self.id, self.robot_id))
def decode(data):
if hasattr(data, 'read'):
buf = data
else:
buf = BytesIO(data)
if buf.read(8) != leg_control_data_lcmt._get_packed_fingerprint():
raise ValueError("Decode error")
return leg_control_data_lcmt._decode_one(buf)
decode = staticmethod(decode)
def _decode_one(buf):
self = leg_control_data_lcmt()
self.q = struct.unpack('>12f', buf.read(48))
self.qd = struct.unpack('>12f', buf.read(48))
self.p = struct.unpack('>12f', buf.read(48))
self.v = struct.unpack('>12f', buf.read(48))
self.tau_est = struct.unpack('>12f', buf.read(48))
self.timestamp_us, self.id, self.robot_id = struct.unpack(">qqq", buf.read(24))
return self
_decode_one = staticmethod(_decode_one)
def _get_hash_recursive(parents):
if leg_control_data_lcmt in parents: return 0
tmphash = (0xa9a928b534bfc487) & 0xffffffffffffffff
tmphash = (((tmphash << 1) & 0xffffffffffffffff) + (tmphash >> 63)) & 0xffffffffffffffff
return tmphash
_get_hash_recursive = staticmethod(_get_hash_recursive)
_packed_fingerprint = None
def _get_packed_fingerprint():
if leg_control_data_lcmt._packed_fingerprint is None:
leg_control_data_lcmt._packed_fingerprint = struct.pack(">Q", leg_control_data_lcmt._get_hash_recursive([]))
return leg_control_data_lcmt._packed_fingerprint
_get_packed_fingerprint = staticmethod(_get_packed_fingerprint)
def get_hash(self):
"""Get the LCM hash of the struct"""
return struct.unpack(">Q", leg_control_data_lcmt._get_packed_fingerprint())[0]

View File

@ -1,12 +0,0 @@
struct pd_tau_targets_lcmt
{
double q_des[12];
double qd_des[12];
double tau_ff[12];
double kp[12];
double kd[12];
int64_t timestamp_us;
int64_t id;
int64_t robot_id;
double se_contactState[4];
}

View File

@ -1,88 +0,0 @@
"""LCM type definitions
This file automatically generated by lcm.
DO NOT MODIFY BY HAND!!!!
"""
try:
import cStringIO.StringIO as BytesIO
except ImportError:
from io import BytesIO
import struct
class pd_tau_targets_lcmt(object):
__slots__ = ["q_des", "qd_des", "tau_ff", "kp", "kd", "timestamp_us", "id", "robot_id", "se_contactState"]
__typenames__ = ["double", "double", "double", "double", "double", "int64_t", "int64_t", "int64_t", "double"]
__dimensions__ = [[12], [12], [12], [12], [12], None, None, None, [4]]
def __init__(self):
self.q_des = [0.0 for dim0 in range(12)]
self.qd_des = [0.0 for dim0 in range(12)]
self.tau_ff = [0.0 for dim0 in range(12)]
self.kp = [0.0 for dim0 in range(12)]
self.kd = [0.0 for dim0 in range(12)]
self.timestamp_us = 0
self.id = 0
self.robot_id = 0
self.se_contactState = [0.0 for dim0 in range(4)]
def encode(self):
buf = BytesIO()
buf.write(pd_tau_targets_lcmt._get_packed_fingerprint())
self._encode_one(buf)
return buf.getvalue()
def _encode_one(self, buf):
buf.write(struct.pack('>12d', *self.q_des[:12]))
buf.write(struct.pack('>12d', *self.qd_des[:12]))
buf.write(struct.pack('>12d', *self.tau_ff[:12]))
buf.write(struct.pack('>12d', *self.kp[:12]))
buf.write(struct.pack('>12d', *self.kd[:12]))
buf.write(struct.pack(">qqq", self.timestamp_us, self.id, self.robot_id))
buf.write(struct.pack('>4d', *self.se_contactState[:4]))
def decode(data):
if hasattr(data, 'read'):
buf = data
else:
buf = BytesIO(data)
if buf.read(8) != pd_tau_targets_lcmt._get_packed_fingerprint():
raise ValueError("Decode error")
return pd_tau_targets_lcmt._decode_one(buf)
decode = staticmethod(decode)
def _decode_one(buf):
self = pd_tau_targets_lcmt()
self.q_des = struct.unpack('>12d', buf.read(96))
self.qd_des = struct.unpack('>12d', buf.read(96))
self.tau_ff = struct.unpack('>12d', buf.read(96))
self.kp = struct.unpack('>12d', buf.read(96))
self.kd = struct.unpack('>12d', buf.read(96))
self.timestamp_us, self.id, self.robot_id = struct.unpack(">qqq", buf.read(24))
self.se_contactState = struct.unpack('>4d', buf.read(32))
return self
_decode_one = staticmethod(_decode_one)
def _get_hash_recursive(parents):
if pd_tau_targets_lcmt in parents: return 0
tmphash = (0x6d88128ef1291cc1) & 0xffffffffffffffff
tmphash = (((tmphash << 1) & 0xffffffffffffffff) + (tmphash >> 63)) & 0xffffffffffffffff
return tmphash
_get_hash_recursive = staticmethod(_get_hash_recursive)
_packed_fingerprint = None
def _get_packed_fingerprint():
if pd_tau_targets_lcmt._packed_fingerprint is None:
pd_tau_targets_lcmt._packed_fingerprint = struct.pack(">Q", pd_tau_targets_lcmt._get_hash_recursive([]))
return pd_tau_targets_lcmt._packed_fingerprint
_get_packed_fingerprint = staticmethod(_get_packed_fingerprint)
def get_hash(self):
"""Get the LCM hash of the struct"""
return struct.unpack(">Q", pd_tau_targets_lcmt._get_packed_fingerprint())[0]

View File

@ -1,13 +0,0 @@
struct rc_command_lcmt
{
int16_t mode;
float left_stick[2];
float right_stick[2];
float knobs[2];
int16_t left_upper_switch;
int16_t left_lower_left_switch;
int16_t left_lower_right_switch;
int16_t right_upper_switch;
int16_t right_lower_left_switch;
int16_t right_lower_right_switch;
}

View File

@ -1,90 +0,0 @@
"""LCM type definitions
This file automatically generated by lcm.
DO NOT MODIFY BY HAND!!!!
"""
try:
import cStringIO.StringIO as BytesIO
except ImportError:
from io import BytesIO
import struct
class rc_command_lcmt(object):
__slots__ = ["mode", "left_stick", "right_stick", "knobs", "left_upper_switch", "left_lower_left_switch",
"left_lower_right_switch", "right_upper_switch", "right_lower_left_switch", "right_lower_right_switch"]
__typenames__ = ["int16_t", "float", "float", "float", "int16_t", "int16_t", "int16_t", "int16_t", "int16_t",
"int16_t"]
__dimensions__ = [None, [2], [2], [2], None, None, None, None, None, None]
def __init__(self):
self.mode = 0
self.left_stick = [0.0 for dim0 in range(2)]
self.right_stick = [0.0 for dim0 in range(2)]
self.knobs = [0.0 for dim0 in range(2)]
self.left_upper_switch = 0
self.left_lower_left_switch = 0
self.left_lower_right_switch = 0
self.right_upper_switch = 0
self.right_lower_left_switch = 0
self.right_lower_right_switch = 0
def encode(self):
buf = BytesIO()
buf.write(rc_command_lcmt._get_packed_fingerprint())
self._encode_one(buf)
return buf.getvalue()
def _encode_one(self, buf):
buf.write(struct.pack(">h", self.mode))
buf.write(struct.pack('>2f', *self.left_stick[:2]))
buf.write(struct.pack('>2f', *self.right_stick[:2]))
buf.write(struct.pack('>2f', *self.knobs[:2]))
buf.write(
struct.pack(">hhhhhh", self.left_upper_switch, self.left_lower_left_switch, self.left_lower_right_switch,
self.right_upper_switch, self.right_lower_left_switch, self.right_lower_right_switch))
def decode(data):
if hasattr(data, 'read'):
buf = data
else:
buf = BytesIO(data)
if buf.read(8) != rc_command_lcmt._get_packed_fingerprint():
raise ValueError("Decode error")
return rc_command_lcmt._decode_one(buf)
decode = staticmethod(decode)
def _decode_one(buf):
self = rc_command_lcmt()
self.mode = struct.unpack(">h", buf.read(2))[0]
self.left_stick = struct.unpack('>2f', buf.read(8))
self.right_stick = struct.unpack('>2f', buf.read(8))
self.knobs = struct.unpack('>2f', buf.read(8))
self.left_upper_switch, self.left_lower_left_switch, self.left_lower_right_switch, self.right_upper_switch, self.right_lower_left_switch, self.right_lower_right_switch = struct.unpack(
">hhhhhh", buf.read(12))
return self
_decode_one = staticmethod(_decode_one)
def _get_hash_recursive(parents):
if rc_command_lcmt in parents: return 0
tmphash = (0xc7726b02ec3e7de2) & 0xffffffffffffffff
tmphash = (((tmphash << 1) & 0xffffffffffffffff) + (tmphash >> 63)) & 0xffffffffffffffff
return tmphash
_get_hash_recursive = staticmethod(_get_hash_recursive)
_packed_fingerprint = None
def _get_packed_fingerprint():
if rc_command_lcmt._packed_fingerprint is None:
rc_command_lcmt._packed_fingerprint = struct.pack(">Q", rc_command_lcmt._get_hash_recursive([]))
return rc_command_lcmt._packed_fingerprint
_get_packed_fingerprint = staticmethod(_get_packed_fingerprint)
def get_hash(self):
"""Get the LCM hash of the struct"""
return struct.unpack(">Q", rc_command_lcmt._get_packed_fingerprint())[0]

View File

@ -1,16 +0,0 @@
struct state_estimator_lcmt
{
float p[3];
float vWorld[3];
float vBody[3];
float rpy[3];
float omegaBody[3];
float omegaWorld[3];
float quat[4];
float contact_estimate[4];
float aBody[3];
float aWorld[3];
int64_t timestamp_us;
int64_t id;
int64_t robot_id;
}

View File

@ -1,102 +0,0 @@
"""LCM type definitions
This file automatically generated by lcm.
DO NOT MODIFY BY HAND!!!!
"""
try:
import cStringIO.StringIO as BytesIO
except ImportError:
from io import BytesIO
import struct
class state_estimator_lcmt(object):
__slots__ = ["p", "vWorld", "vBody", "rpy", "omegaBody", "omegaWorld", "quat", "contact_estimate", "aBody",
"aWorld", "timestamp_us", "id", "robot_id"]
__typenames__ = ["float", "float", "float", "float", "float", "float", "float", "float", "float", "float",
"int64_t", "int64_t", "int64_t"]
__dimensions__ = [[3], [3], [3], [3], [3], [3], [4], [4], [3], [3], None, None, None]
def __init__(self):
self.p = [0.0 for dim0 in range(3)]
self.vWorld = [0.0 for dim0 in range(3)]
self.vBody = [0.0 for dim0 in range(3)]
self.rpy = [0.0 for dim0 in range(3)]
self.omegaBody = [0.0 for dim0 in range(3)]
self.omegaWorld = [0.0 for dim0 in range(3)]
self.quat = [0.0 for dim0 in range(4)]
self.contact_estimate = [0.0 for dim0 in range(4)]
self.aBody = [0.0 for dim0 in range(3)]
self.aWorld = [0.0 for dim0 in range(3)]
self.timestamp_us = 0
self.id = 0
self.robot_id = 0
def encode(self):
buf = BytesIO()
buf.write(state_estimator_lcmt._get_packed_fingerprint())
self._encode_one(buf)
return buf.getvalue()
def _encode_one(self, buf):
buf.write(struct.pack('>3f', *self.p[:3]))
buf.write(struct.pack('>3f', *self.vWorld[:3]))
buf.write(struct.pack('>3f', *self.vBody[:3]))
buf.write(struct.pack('>3f', *self.rpy[:3]))
buf.write(struct.pack('>3f', *self.omegaBody[:3]))
buf.write(struct.pack('>3f', *self.omegaWorld[:3]))
buf.write(struct.pack('>4f', *self.quat[:4]))
buf.write(struct.pack('>4f', *self.contact_estimate[:4]))
buf.write(struct.pack('>3f', *self.aBody[:3]))
buf.write(struct.pack('>3f', *self.aWorld[:3]))
buf.write(struct.pack(">qqq", self.timestamp_us, self.id, self.robot_id))
def decode(data):
if hasattr(data, 'read'):
buf = data
else:
buf = BytesIO(data)
if buf.read(8) != state_estimator_lcmt._get_packed_fingerprint():
raise ValueError("Decode error")
return state_estimator_lcmt._decode_one(buf)
decode = staticmethod(decode)
def _decode_one(buf):
self = state_estimator_lcmt()
self.p = struct.unpack('>3f', buf.read(12))
self.vWorld = struct.unpack('>3f', buf.read(12))
self.vBody = struct.unpack('>3f', buf.read(12))
self.rpy = struct.unpack('>3f', buf.read(12))
self.omegaBody = struct.unpack('>3f', buf.read(12))
self.omegaWorld = struct.unpack('>3f', buf.read(12))
self.quat = struct.unpack('>4f', buf.read(16))
self.contact_estimate = struct.unpack('>4f', buf.read(16))
self.aBody = struct.unpack('>3f', buf.read(12))
self.aWorld = struct.unpack('>3f', buf.read(12))
self.timestamp_us, self.id, self.robot_id = struct.unpack(">qqq", buf.read(24))
return self
_decode_one = staticmethod(_decode_one)
def _get_hash_recursive(parents):
if state_estimator_lcmt in parents: return 0
tmphash = (0xea87c8282effe5b6) & 0xffffffffffffffff
tmphash = (((tmphash << 1) & 0xffffffffffffffff) + (tmphash >> 63)) & 0xffffffffffffffff
return tmphash
_get_hash_recursive = staticmethod(_get_hash_recursive)
_packed_fingerprint = None
def _get_packed_fingerprint():
if state_estimator_lcmt._packed_fingerprint is None:
state_estimator_lcmt._packed_fingerprint = struct.pack(">Q", state_estimator_lcmt._get_hash_recursive([]))
return state_estimator_lcmt._packed_fingerprint
_get_packed_fingerprint = staticmethod(_get_packed_fingerprint)
def get_hash(self):
"""Get the LCM hash of the struct"""
return struct.unpack(">Q", state_estimator_lcmt._get_packed_fingerprint())[0]

View File

@ -1,77 +0,0 @@
import glob
import pickle as pkl
import lcm
import sys
from go1_gym_deploy.utils.deployment_runner import DeploymentRunner
from go1_gym_deploy.envs.lcm_agent import LCMAgent
from go1_gym_deploy.utils.cheetah_state_estimator import StateEstimator
from go1_gym_deploy.utils.command_profile import *
import pathlib
lc = lcm.LCM("udpm://239.255.76.67:7667?ttl=255")
def load_and_run_policy(label, experiment_name, max_vel=1.0, max_yaw_vel=1.0):
# load agent
dirs = glob.glob(f"../../runs/{label}/*")
logdir = sorted(dirs)[0]
with open(logdir+"/parameters.pkl", 'rb') as file:
pkl_cfg = pkl.load(file)
print(pkl_cfg.keys())
cfg = pkl_cfg["Cfg"]
print(cfg.keys())
se = StateEstimator(lc)
control_dt = 0.02
command_profile = RCControllerProfile(dt=control_dt, state_estimator=se, x_scale=max_vel, y_scale=0.6, yaw_scale=max_yaw_vel)
hardware_agent = LCMAgent(cfg, se, command_profile)
se.spin()
from go1_gym_deploy.envs.history_wrapper import HistoryWrapper
hardware_agent = HistoryWrapper(hardware_agent)
policy = load_policy(logdir)
# load runner
root = f"{pathlib.Path(__file__).parent.resolve()}/../../logs/"
pathlib.Path(root).mkdir(parents=True, exist_ok=True)
deployment_runner = DeploymentRunner(experiment_name=experiment_name, se=None,
log_root=f"{root}/{experiment_name}")
deployment_runner.add_control_agent(hardware_agent, "hardware_closed_loop")
deployment_runner.add_policy(policy)
deployment_runner.add_command_profile(command_profile)
if len(sys.argv) >= 2:
max_steps = int(sys.argv[1])
else:
max_steps = 10000000
print(f'max steps {max_steps}')
deployment_runner.run(max_steps=max_steps, logging=True)
def load_policy(logdir):
body = torch.jit.load(logdir + '/checkpoints/body_latest.jit')
import os
adaptation_module = torch.jit.load(logdir + '/checkpoints/adaptation_module_latest.jit')
def policy(obs, info):
i = 0
latent = adaptation_module.forward(obs["obs_history"].to('cpu'))
action = body.forward(torch.cat((obs["obs_history"].to('cpu'), latent), dim=-1))
info['latent'] = latent
return action
return policy
if __name__ == '__main__':
label = "gait-conditioned-agility/pretrain-v0/train"
experiment_name = "example_experiment"
load_and_run_policy(label, experiment_name=experiment_name, max_vel=3.5, max_yaw_vel=5.0)

View File

@ -1,8 +0,0 @@
#!/bin/bash
# download docker image if it doesn't exist yet
wget --directory-prefix=../docker -nc --load-cookies /tmp/cookies.txt "https://docs.google.com/uc?export=download&confirm=$(wget --quiet --save-cookies /tmp/cookies.txt --keep-session-cookies --no-check-certificate 'https://docs.google.com/uc?export=download&id=1XkVpyYyYqQQ4FcgLIDUxg-GR1WI89-XC' -O- | sed -rn 's/.*confirm=([0-9A-Za-z_]+).*/\1\n/p')&id=1XkVpyYyYqQQ4FcgLIDUxg-GR1WI89-XC" -O deployment_image.tar && rm -rf /tmp/cookies.txt
#rsync -av -e ssh --exclude=*.pt --exclude=*.mp4 $PWD/../../go1_gym_deploy $PWD/../../runs $PWD/../../setup.py pi@192.168.12.1:/home/pi/go1_gym
rsync -av -e ssh --exclude=*.pt --exclude=*.mp4 $PWD/../../go1_gym_deploy $PWD/../../runs $PWD/../setup.py unitree@192.168.123.15:/home/unitree/go1_gym
#scp -r $PWD/../../runs pi@192.168.12.1:/home/pi/go1_gym
#scp -r $PWD/../../setup.py pi@192.168.12.1:/home/pi/go1_gym

View File

@ -1,12 +0,0 @@
from setuptools import find_packages
from distutils.core import setup
setup(
name='go1_gym',
version='1.0.0',
author='Gabriel Margolis',
license="BSD-3-Clause",
packages=find_packages(),
author_email='gmargo@mit.edu',
description='Toolkit for deployment of sim-to-real RL on the Unitree Go1.'
)

View File

@ -1,154 +0,0 @@
import lcm
import threading
import time
import select
import numpy as np
from go1_gym_deploy.lcm_types.leg_control_data_lcmt import leg_control_data_lcmt
from go1_gym_deploy.lcm_types.rc_command_lcmt import rc_command_lcmt
from go1_gym_deploy.lcm_types.state_estimator_lcmt import state_estimator_lcmt
from go1_gym_deploy.lcm_types.vicon_pose_lcmt import vicon_pose_lcmt
from go1_gym_deploy.lcm_types.camera_message_lcmt import camera_message_lcmt
from go1_gym_deploy.lcm_types.camera_message_rect_wide import camera_message_rect_wide
from go1_gym_deploy.lcm_types.camera_message_rect_wide_mask import camera_message_rect_wide_mask
class UnitreeLCMInspector:
def __init__(self, lc):
self.lc = lc
self.camera_names = ["front", "bottom", "left", "right", "rear"]
for cam_name in self.camera_names:
self.camera_subscription = self.lc.subscribe(f"rect_image_{cam_name}", self._rect_camera_cb)
for cam_name in self.camera_names:
self.camera_subscription = self.lc.subscribe(f"rect_image_{cam_name}_mask", self._mask_camera_cb)
self.camera_image_left = None
self.camera_image_right = None
self.camera_image_front = None
self.camera_image_bottom = None
self.camera_image_rear = None
self.ts = [time.time(), time.time(), time.time(), time.time(), time.time(),]
self.num_low_states = 0
def _rect_camera_cb(self, channel, data):
# message_types = [camera_message_rect_front, camera_message_rect_front_chin, camera_message_rect_left,
# camera_message_rect_right, camera_message_rect_rear_down]
# image_shapes = [(200, 200, 3), (100, 100, 3), (100, 232, 3), (100, 232, 3), (200, 200, 3)]
message_types = [camera_message_rect_wide, camera_message_rect_wide, camera_message_rect_wide, camera_message_rect_wide, camera_message_rect_wide]
image_shapes = [(116, 100, 3), (116, 100, 3), (116, 100, 3), (116, 100, 3), (116, 100, 3)]
cam_name = channel.split("_")[-1]
cam_id = self.camera_names.index(cam_name) + 1
print(channel, message_types[cam_id - 1])
msg = message_types[cam_id - 1].decode(data)
img = np.fromstring(msg.data, dtype=np.uint8)
img = np.flip(np.flip(
img.reshape((image_shapes[cam_id - 1][2], image_shapes[cam_id - 1][1], image_shapes[cam_id - 1][0])),
axis=0), axis=1).transpose(1, 2, 0)
if cam_id == 1:
self.camera_image_front = img
elif cam_id == 2:
self.camera_image_bottom = img
elif cam_id == 3:
self.camera_image_left = img
elif cam_id == 4:
self.camera_image_right = img
elif cam_id == 5:
self.camera_image_rear = img
else:
print("Image received from camera with unknown ID#!")
print(f"f{1. / (time.time() - self.ts[cam_id - 1])}: received py from {cam_name}!")
self.ts[cam_id-1] = time.time()
from PIL import Image
im = Image.fromarray(img)
im.save(f"{cam_name}_image.jpeg")
def _mask_camera_cb(self, channel, data):
message_types = [camera_message_rect_wide_mask for i in range(5)]
image_shapes = [(116, 100, 1) for i in range(5)]
cam_name = channel.split("_")[-2]
cam_id = self.camera_names.index(cam_name) + 1
print(channel, message_types[cam_id - 1])
msg = message_types[cam_id - 1].decode(data)
img = np.array(list(msg.data)).astype(np.uint8)
img = np.flip(np.flip(
img.reshape((image_shapes[cam_id - 1][2], image_shapes[cam_id - 1][1], image_shapes[cam_id - 1][0])),
axis=0), axis=1)
if cam_id == 1:
self.camera_image_front = img
elif cam_id == 2:
self.camera_image_bottom = img
elif cam_id == 3:
self.camera_image_left = img
elif cam_id == 4:
self.camera_image_right = img
elif cam_id == 5:
self.camera_image_rear = img
else:
print("Image received from camera with unknown ID#!")
print(f"f{1. / (time.time() - self.ts[cam_id - 1])}: received py from {cam_name}!")
self.ts[cam_id-1] = time.time()
from PIL import Image
# print(img[0])
im = Image.fromarray(img[0])
im.save(f"{cam_name}_mask_image.jpeg")
def publish_30Hz(self):
msg = camera_message_rect_wide()
msg.data = [0] * 34800
self.lc.publish("rect_image_rear", msg.encode())
time.sleep(1./30.)
def poll(self, cb=None):
t = time.time()
try:
while True:
timeout = 0.01
rfds, wfds, efds = select.select([self.lc.fileno()], [], [], timeout)
if rfds:
# print("message received!")
self.lc.handle()
# print(f'Freq {1. / (time.time() - t)} Hz'); t = time.time()
else:
continue
# print(f'waiting for message... Freq {1. / (time.time() - t)} Hz'); t = time.time()
# if cb is not None:
# cb()
except KeyboardInterrupt:
pass
def spin(self):
self.run_thread = threading.Thread(target=self.poll, daemon=False)
self.run_thread.start()
if __name__ == "__main__":
import lcm
lc = lcm.LCM("udpm://239.255.76.67:7667?ttl=255")
#check_lcm_msgs()
print("init")
insp = UnitreeLCMInspector(lc)
print("polling")
# insp.poll()
while True:
insp.publish_30Hz()

View File

@ -1,236 +0,0 @@
/*****************************************************************
Copyright (c) 2020, Unitree Robotics.Co.Ltd. All rights reserved.
******************************************************************/
#include "unitree_legged_sdk/unitree_legged_sdk.h"
#include "unitree_legged_sdk/joystick.h"
#include <math.h>
#include <iostream>
#include <stdio.h>
#include <stdint.h>
#include <thread>
#include <lcm/lcm-cpp.hpp>
#include "state_estimator_lcmt.hpp"
#include "leg_control_data_lcmt.hpp"
#include "pd_tau_targets_lcmt.hpp"
#include "rc_command_lcmt.hpp"
using namespace std;
using namespace UNITREE_LEGGED_SDK;
class Custom
{
public:
Custom(uint8_t level): safe(LeggedType::Go1), udp(level, 8090, "192.168.123.10", 8007) {
udp.InitCmdData(cmd);
}
void UDPRecv();
void UDPSend();
void RobotControl();
void init();
void handleActionLCM(const lcm::ReceiveBuffer *rbuf, const std::string & chan, const pd_tau_targets_lcmt * msg);
void _simpleLCMThread();
Safety safe;
UDP udp;
LowCmd cmd = {0};
LowState state = {0};
float qInit[3]={0};
float qDes[3]={0};
float sin_mid_q[3] = {0.0, 1.2, -2.0};
float Kp[3] = {0};
float Kd[3] = {0};
double time_consume = 0;
int rate_count = 0;
int sin_count = 0;
int motiontime = 0;
float dt = 0.002; // 0.001~0.01
lcm::LCM _simpleLCM;
std::thread _simple_LCM_thread;
bool _firstCommandReceived;
bool _firstRun;
state_estimator_lcmt body_state_simple = {0};
leg_control_data_lcmt joint_state_simple = {0};
pd_tau_targets_lcmt joint_command_simple = {0};
rc_command_lcmt rc_command = {0};
xRockerBtnDataStruct _keyData;
int mode = 0;
};
void Custom::init()
{
_simpleLCM.subscribe("pd_plustau_targets", &Custom::handleActionLCM, this);
_simple_LCM_thread = std::thread(&Custom::_simpleLCMThread, this);
_firstCommandReceived = false;
_firstRun = true;
// set nominal pose
for(int i = 0; i < 12; i++){
joint_command_simple.qd_des[i] = 0;
joint_command_simple.tau_ff[i] = 0;
joint_command_simple.kp[i] = 20.;
joint_command_simple.kd[i] = 0.5;
}
joint_command_simple.q_des[0] = -0.3;
joint_command_simple.q_des[1] = 1.2;
joint_command_simple.q_des[2] = -2.721;
joint_command_simple.q_des[3] = 0.3;
joint_command_simple.q_des[4] = 1.2;
joint_command_simple.q_des[5] = -2.721;
joint_command_simple.q_des[6] = -0.3;
joint_command_simple.q_des[7] = 1.2;
joint_command_simple.q_des[8] = -2.721;
joint_command_simple.q_des[9] = 0.3;
joint_command_simple.q_des[10] = 1.2;
joint_command_simple.q_des[11] = -2.721;
printf("SET NOMINAL POSE");
}
void Custom::UDPRecv()
{
udp.Recv();
}
void Custom::UDPSend()
{
udp.Send();
}
double jointLinearInterpolation(double initPos, double targetPos, double rate)
{
double p;
rate = std::min(std::max(rate, 0.0), 1.0);
p = initPos*(1-rate) + targetPos*rate;
return p;
}
void Custom::handleActionLCM(const lcm::ReceiveBuffer *rbuf, const std::string & chan, const pd_tau_targets_lcmt * msg){
(void) rbuf;
(void) chan;
joint_command_simple = *msg;
_firstCommandReceived = true;
}
void Custom::_simpleLCMThread(){
while(true){
_simpleLCM.handle();
}
}
void Custom::RobotControl()
{
motiontime++;
udp.GetRecv(state);
memcpy(&_keyData, &state.wirelessRemote[0], 40);
rc_command.left_stick[0] = _keyData.lx;
rc_command.left_stick[1] = _keyData.ly;
rc_command.right_stick[0] = _keyData.rx;
rc_command.right_stick[1] = _keyData.ry;
rc_command.right_lower_right_switch = _keyData.btn.components.R2;
rc_command.right_upper_switch = _keyData.btn.components.R1;
rc_command.left_lower_left_switch = _keyData.btn.components.L2;
rc_command.left_upper_switch = _keyData.btn.components.L1;
if(_keyData.btn.components.A > 0){
mode = 0;
} else if(_keyData.btn.components.B > 0){
mode = 1;
}else if(_keyData.btn.components.X > 0){
mode = 2;
}else if(_keyData.btn.components.Y > 0){
mode = 3;
}else if(_keyData.btn.components.up > 0){
mode = 4;
}else if(_keyData.btn.components.right > 0){
mode = 5;
}else if(_keyData.btn.components.down > 0){
mode = 6;
}else if(_keyData.btn.components.left > 0){
mode = 7;
}
rc_command.mode = mode;
// publish state to LCM
for(int i = 0; i < 12; i++){
joint_state_simple.q[i] = state.motorState[i].q;
joint_state_simple.qd[i] = state.motorState[i].dq;
joint_state_simple.tau_est[i] = state.motorState[i].tauEst;
}
for(int i = 0; i < 4; i++){
body_state_simple.quat[i] = state.imu.quaternion[i];
}
for(int i = 0; i < 3; i++){
body_state_simple.rpy[i] = state.imu.rpy[i];
body_state_simple.aBody[i] = state.imu.accelerometer[i];
body_state_simple.omegaBody[i] = state.imu.gyroscope[i];
}
for(int i = 0; i < 4; i++){
body_state_simple.contact_estimate[i] = state.footForce[i];
}
_simpleLCM.publish("state_estimator_data", &body_state_simple);
_simpleLCM.publish("leg_control_data", &joint_state_simple);
_simpleLCM.publish("rc_command", &rc_command);
if(_firstRun && joint_state_simple.q[0] != 0){
for(int i = 0; i < 12; i++){
joint_command_simple.q_des[i] = joint_state_simple.q[i];
}
_firstRun = false;
}
for(int i = 0; i < 12; i++){
cmd.motorCmd[i].q = joint_command_simple.q_des[i];
cmd.motorCmd[i].dq = joint_command_simple.qd_des[i];
cmd.motorCmd[i].Kp = joint_command_simple.kp[i];
cmd.motorCmd[i].Kd = joint_command_simple.kd[i];
cmd.motorCmd[i].tau = joint_command_simple.tau_ff[i];
}
safe.PositionLimit(cmd);
int res1 = safe.PowerProtect(cmd, state, 9);
udp.SetSend(cmd);
}
int main(void)
{
std::cout << "Communication level is set to LOW-level." << std::endl
<< "WARNING: Make sure the robot is hung up." << std::endl
<< "Press Enter to continue..." << std::endl;
std::cin.ignore();
Custom custom(LOWLEVEL);
custom.init();
// InitEnvironment();
LoopFunc loop_control("control_loop", custom.dt, boost::bind(&Custom::RobotControl, &custom));
LoopFunc loop_udpSend("udp_send", custom.dt, 3, boost::bind(&Custom::UDPSend, &custom));
LoopFunc loop_udpRecv("udp_recv", custom.dt, 3, boost::bind(&Custom::UDPRecv, &custom));
loop_udpSend.start();
loop_udpRecv.start();
loop_control.start();
while(1){
sleep(10);
};
return 0;
}

View File

@ -1,406 +0,0 @@
import math
import select
import threading
import time
import numpy as np
from go1_gym_deploy.lcm_types.leg_control_data_lcmt import leg_control_data_lcmt
from go1_gym_deploy.lcm_types.rc_command_lcmt import rc_command_lcmt
from go1_gym_deploy.lcm_types.state_estimator_lcmt import state_estimator_lcmt
from go1_gym_deploy.lcm_types.camera_message_lcmt import camera_message_lcmt
from go1_gym_deploy.lcm_types.camera_message_rect_wide import camera_message_rect_wide
def get_rpy_from_quaternion(q):
w, x, y, z = q
r = np.arctan2(2 * (w * x + y * z), 1 - 2 * (x ** 2 + y ** 2))
p = np.arcsin(2 * (w * y - z * x))
y = np.arctan2(2 * (w * z + x * y), 1 - 2 * (y ** 2 + z ** 2))
return np.array([r, p, y])
def get_rotation_matrix_from_rpy(rpy):
"""
Get rotation matrix from the given quaternion.
Args:
q (np.array[float[4]]): quaternion [w,x,y,z]
Returns:
np.array[float[3,3]]: rotation matrix.
"""
r, p, y = rpy
R_x = np.array([[1, 0, 0],
[0, math.cos(r), -math.sin(r)],
[0, math.sin(r), math.cos(r)]
])
R_y = np.array([[math.cos(p), 0, math.sin(p)],
[0, 1, 0],
[-math.sin(p), 0, math.cos(p)]
])
R_z = np.array([[math.cos(y), -math.sin(y), 0],
[math.sin(y), math.cos(y), 0],
[0, 0, 1]
])
rot = np.dot(R_z, np.dot(R_y, R_x))
return rot
class StateEstimator:
def __init__(self, lc, use_cameras=True):
# reverse legs
self.joint_idxs = [3, 4, 5, 0, 1, 2, 9, 10, 11, 6, 7, 8]
self.contact_idxs = [1, 0, 3, 2]
# self.joint_idxs = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
self.lc = lc
self.joint_pos = np.zeros(12)
self.joint_vel = np.zeros(12)
self.tau_est = np.zeros(12)
self.world_lin_vel = np.zeros(3)
self.world_ang_vel = np.zeros(3)
self.euler = np.zeros(3)
self.R = np.eye(3)
self.buf_idx = 0
self.smoothing_length = 12
self.deuler_history = np.zeros((self.smoothing_length, 3))
self.dt_history = np.zeros((self.smoothing_length, 1))
self.euler_prev = np.zeros(3)
self.timuprev = time.time()
self.body_lin_vel = np.zeros(3)
self.body_ang_vel = np.zeros(3)
self.smoothing_ratio = 0.2
self.contact_state = np.ones(4)
self.mode = 0
self.ctrlmode_left = 0
self.ctrlmode_right = 0
self.left_stick = [0, 0]
self.right_stick = [0, 0]
self.left_upper_switch = 0
self.left_lower_left_switch = 0
self.left_lower_right_switch = 0
self.right_upper_switch = 0
self.right_lower_left_switch = 0
self.right_lower_right_switch = 0
self.left_upper_switch_pressed = 0
self.left_lower_left_switch_pressed = 0
self.left_lower_right_switch_pressed = 0
self.right_upper_switch_pressed = 0
self.right_lower_left_switch_pressed = 0
self.right_lower_right_switch_pressed = 0
# default trotting gait
self.cmd_freq = 3.0
self.cmd_phase = 0.5
self.cmd_offset = 0.0
self.cmd_duration = 0.5
self.init_time = time.time()
self.received_first_legdata = False
self.imu_subscription = self.lc.subscribe("state_estimator_data", self._imu_cb)
self.legdata_state_subscription = self.lc.subscribe("leg_control_data", self._legdata_cb)
self.rc_command_subscription = self.lc.subscribe("rc_command", self._rc_command_cb)
if use_cameras:
for cam_id in [1, 2, 3, 4, 5]:
self.camera_subscription = self.lc.subscribe(f"camera{cam_id}", self._camera_cb)
self.camera_names = ["front", "bottom", "left", "right", "rear"]
for cam_name in self.camera_names:
self.camera_subscription = self.lc.subscribe(f"rect_image_{cam_name}", self._rect_camera_cb)
self.camera_image_left = None
self.camera_image_right = None
self.camera_image_front = None
self.camera_image_bottom = None
self.camera_image_rear = None
self.body_loc = np.array([0, 0, 0])
self.body_quat = np.array([0, 0, 0, 1])
def get_body_linear_vel(self):
self.body_lin_vel = np.dot(self.R.T, self.world_lin_vel)
return self.body_lin_vel
def get_body_angular_vel(self):
self.body_ang_vel = self.smoothing_ratio * np.mean(self.deuler_history / self.dt_history, axis=0) + (
1 - self.smoothing_ratio) * self.body_ang_vel
return self.body_ang_vel
def get_gravity_vector(self):
grav = np.dot(self.R.T, np.array([0, 0, -1]))
return grav
def get_contact_state(self):
return self.contact_state[self.contact_idxs]
def get_rpy(self):
return self.euler
def get_command(self):
MODES_LEFT = ["body_height", "lat_vel", "stance_width"]
MODES_RIGHT = ["step_frequency", "footswing_height", "body_pitch"]
if self.left_upper_switch_pressed:
self.ctrlmode_left = (self.ctrlmode_left + 1) % 3
self.left_upper_switch_pressed = False
if self.right_upper_switch_pressed:
self.ctrlmode_right = (self.ctrlmode_right + 1) % 3
self.right_upper_switch_pressed = False
MODE_LEFT = MODES_LEFT[self.ctrlmode_left]
MODE_RIGHT = MODES_RIGHT[self.ctrlmode_right]
# always in use
cmd_x = 1 * self.left_stick[1]
cmd_yaw = -1 * self.right_stick[0]
# default values
cmd_y = 0. # -1 * self.left_stick[0]
cmd_height = 0.
cmd_footswing = 0.08
cmd_stance_width = 0.33
cmd_stance_length = 0.40
cmd_ori_pitch = 0.
cmd_ori_roll = 0.
cmd_freq = 3.0
# joystick commands
if MODE_LEFT == "body_height":
cmd_height = 0.3 * self.left_stick[0]
elif MODE_LEFT == "lat_vel":
cmd_y = 0.6 * self.left_stick[0]
elif MODE_LEFT == "stance_width":
cmd_stance_width = 0.275 + 0.175 * self.left_stick[0]
if MODE_RIGHT == "step_frequency":
min_freq = 2.0
max_freq = 4.0
cmd_freq = (1 + self.right_stick[1]) / 2 * (max_freq - min_freq) + min_freq
elif MODE_RIGHT == "footswing_height":
cmd_footswing = max(0, self.right_stick[1]) * 0.32 + 0.03
elif MODE_RIGHT == "body_pitch":
cmd_ori_pitch = -0.4 * self.right_stick[1]
# gait buttons
if self.mode == 0:
self.cmd_phase = 0.5
self.cmd_offset = 0.0
self.cmd_bound = 0.0
self.cmd_duration = 0.5
elif self.mode == 1:
self.cmd_phase = 0.0
self.cmd_offset = 0.0
self.cmd_bound = 0.0
self.cmd_duration = 0.5
elif self.mode == 2:
self.cmd_phase = 0.0
self.cmd_offset = 0.5
self.cmd_bound = 0.0
self.cmd_duration = 0.5
elif self.mode == 3:
self.cmd_phase = 0.0
self.cmd_offset = 0.0
self.cmd_bound = 0.5
self.cmd_duration = 0.5
else:
self.cmd_phase = 0.5
self.cmd_offset = 0.0
self.cmd_bound = 0.0
self.cmd_duration = 0.5
return np.array([cmd_x, cmd_y, cmd_yaw, cmd_height, cmd_freq, self.cmd_phase, self.cmd_offset, self.cmd_bound,
self.cmd_duration, cmd_footswing, cmd_ori_pitch, cmd_ori_roll, cmd_stance_width,
cmd_stance_length, 0, 0, 0, 0, 0])
def get_buttons(self):
return np.array([self.left_lower_left_switch, self.left_upper_switch, self.right_lower_right_switch, self.right_upper_switch])
def get_dof_pos(self):
# print("dofposquery", self.joint_pos[self.joint_idxs])
return self.joint_pos[self.joint_idxs]
def get_dof_vel(self):
return self.joint_vel[self.joint_idxs]
def get_tau_est(self):
return self.tau_est[self.joint_idxs]
def get_yaw(self):
return self.euler[2]
def get_body_loc(self):
return np.array(self.body_loc)
def get_body_quat(self):
return np.array(self.body_quat)
def get_camera_front(self):
return self.camera_image_front
def get_camera_bottom(self):
return self.camera_image_bottom
def get_camera_rear(self):
return self.camera_image_rear
def get_camera_left(self):
return self.camera_image_left
def get_camera_right(self):
return self.camera_image_right
def _legdata_cb(self, channel, data):
# print("update legdata")
if not self.received_first_legdata:
self.received_first_legdata = True
print(f"First legdata: {time.time() - self.init_time}")
msg = leg_control_data_lcmt.decode(data)
# print(msg.q)
self.joint_pos = np.array(msg.q)
self.joint_vel = np.array(msg.qd)
self.tau_est = np.array(msg.tau_est)
# print(f"update legdata {msg.id}")
def _imu_cb(self, channel, data):
# print("update imu")
msg = state_estimator_lcmt.decode(data)
self.euler = np.array(msg.rpy)
self.R = get_rotation_matrix_from_rpy(self.euler)
self.contact_state = 1.0 * (np.array(msg.contact_estimate) > 200)
self.deuler_history[self.buf_idx % self.smoothing_length, :] = msg.rpy - self.euler_prev
self.dt_history[self.buf_idx % self.smoothing_length] = time.time() - self.timuprev
self.timuprev = time.time()
self.buf_idx += 1
self.euler_prev = np.array(msg.rpy)
def _sensor_cb(self, channel, data):
pass
def _rc_command_cb(self, channel, data):
msg = rc_command_lcmt.decode(data)
self.left_upper_switch_pressed = ((msg.left_upper_switch and not self.left_upper_switch) or self.left_upper_switch_pressed)
self.left_lower_left_switch_pressed = ((msg.left_lower_left_switch and not self.left_lower_left_switch) or self.left_lower_left_switch_pressed)
self.left_lower_right_switch_pressed = ((msg.left_lower_right_switch and not self.left_lower_right_switch) or self.left_lower_right_switch_pressed)
self.right_upper_switch_pressed = ((msg.right_upper_switch and not self.right_upper_switch) or self.right_upper_switch_pressed)
self.right_lower_left_switch_pressed = ((msg.right_lower_left_switch and not self.right_lower_left_switch) or self.right_lower_left_switch_pressed)
self.right_lower_right_switch_pressed = ((msg.right_lower_right_switch and not self.right_lower_right_switch) or self.right_lower_right_switch_pressed)
self.mode = msg.mode
self.right_stick = msg.right_stick
self.left_stick = msg.left_stick
self.left_upper_switch = msg.left_upper_switch
self.left_lower_left_switch = msg.left_lower_left_switch
self.left_lower_right_switch = msg.left_lower_right_switch
self.right_upper_switch = msg.right_upper_switch
self.right_lower_left_switch = msg.right_lower_left_switch
self.right_lower_right_switch = msg.right_lower_right_switch
# print(self.right_stick, self.left_stick)
def _camera_cb(self, channel, data):
msg = camera_message_lcmt.decode(data)
img = np.fromstring(msg.data, dtype=np.uint8)
img = img.reshape((3, 200, 464)).transpose(1, 2, 0)
cam_id = int(channel[-1])
if cam_id == 1:
self.camera_image_front = img
elif cam_id == 2:
self.camera_image_bottom = img
elif cam_id == 3:
self.camera_image_left = img
elif cam_id == 4:
self.camera_image_right = img
elif cam_id == 5:
self.camera_image_rear = img
else:
print("Image received from camera with unknown ID#!")
#im = Image.fromarray(img).convert('RGB')
#im.save("test_image_" + channel + ".jpg")
#print(channel)
def _rect_camera_cb(self, channel, data):
message_types = [camera_message_rect_wide, camera_message_rect_wide, camera_message_rect_wide,
camera_message_rect_wide, camera_message_rect_wide]
image_shapes = [(116, 100, 3), (116, 100, 3), (116, 100, 3), (116, 100, 3), (116, 100, 3)]
cam_name = channel.split("_")[-1]
# print(f"received py from {cam_name}")
cam_id = self.camera_names.index(cam_name) + 1
msg = message_types[cam_id - 1].decode(data)
img = np.fromstring(msg.data, dtype=np.uint8)
img = np.flip(np.flip(
img.reshape((image_shapes[cam_id - 1][2], image_shapes[cam_id - 1][1], image_shapes[cam_id - 1][0])),
axis=0), axis=1).transpose(1, 2, 0)
# print(img.shape)
# img = np.flip(np.flip(img.reshape(image_shapes[cam_id - 1]), axis=0), axis=1)[:, :,
# [2, 1, 0]] # .transpose(1, 2, 0)
if cam_id == 1:
self.camera_image_front = img
elif cam_id == 2:
self.camera_image_bottom = img
elif cam_id == 3:
self.camera_image_left = img
elif cam_id == 4:
self.camera_image_right = img
elif cam_id == 5:
self.camera_image_rear = img
else:
print("Image received from camera with unknown ID#!")
def poll(self, cb=None):
t = time.time()
try:
while True:
timeout = 0.01
rfds, wfds, efds = select.select([self.lc.fileno()], [], [], timeout)
if rfds:
# print("message received!")
self.lc.handle()
# print(f'Freq {1. / (time.time() - t)} Hz'); t = time.time()
else:
continue
# print(f'waiting for message... Freq {1. / (time.time() - t)} Hz'); t = time.time()
# if cb is not None:
# cb()
except KeyboardInterrupt:
pass
def spin(self):
self.run_thread = threading.Thread(target=self.poll, daemon=False)
self.run_thread.start()
def close(self):
self.lc.unsubscribe(self.legdata_state_subscription)
if __name__ == "__main__":
import lcm
lc = lcm.LCM("udpm://239.255.76.67:7667?ttl=255")
se = StateEstimator(lc)
se.poll()

View File

@ -1,231 +0,0 @@
import torch
class CommandProfile:
def __init__(self, dt, max_time_s=10.):
self.dt = dt
self.max_timestep = int(max_time_s / self.dt)
self.commands = torch.zeros((self.max_timestep, 9))
self.start_time = 0
def get_command(self, t):
timestep = int((t - self.start_time) / self.dt)
timestep = min(timestep, self.max_timestep - 1)
return self.commands[timestep, :]
def get_buttons(self):
return [0, 0, 0, 0]
def reset(self, reset_time):
self.start_time = reset_time
class ConstantAccelerationProfile(CommandProfile):
def __init__(self, dt, max_speed, accel_time, zero_buf_time=0):
super().__init__(dt)
zero_buf_timesteps = int(zero_buf_time / self.dt)
accel_timesteps = int(accel_time / self.dt)
self.commands[:zero_buf_timesteps] = 0
self.commands[zero_buf_timesteps:zero_buf_timesteps + accel_timesteps, 0] = torch.arange(0, max_speed,
step=max_speed / accel_timesteps)
self.commands[zero_buf_timesteps + accel_timesteps:, 0] = max_speed
class ElegantForwardProfile(CommandProfile):
def __init__(self, dt, max_speed, accel_time, duration, deaccel_time, zero_buf_time=0):
import numpy as np
zero_buf_timesteps = int(zero_buf_time / dt)
accel_timesteps = int(accel_time / dt)
duration_timesteps = int(duration / dt)
deaccel_timesteps = int(deaccel_time / dt)
total_time_s = zero_buf_time + accel_time + duration + deaccel_time
super().__init__(dt, total_time_s)
x_vel_cmds = [0] * zero_buf_timesteps + [*np.linspace(0, max_speed, accel_timesteps)] + \
[max_speed] * duration_timesteps + [*np.linspace(max_speed, 0, deaccel_timesteps)]
self.commands[:len(x_vel_cmds), 0] = torch.Tensor(x_vel_cmds)
class ElegantYawProfile(CommandProfile):
def __init__(self, dt, max_speed, zero_buf_time, accel_time, duration, deaccel_time, yaw_rate):
import numpy as np
zero_buf_timesteps = int(zero_buf_time / dt)
accel_timesteps = int(accel_time / dt)
duration_timesteps = int(duration / dt)
deaccel_timesteps = int(deaccel_time / dt)
total_time_s = zero_buf_time + accel_time + duration + deaccel_time
super().__init__(dt, total_time_s)
x_vel_cmds = [0] * zero_buf_timesteps + [*np.linspace(0, max_speed, accel_timesteps)] + \
[max_speed] * duration_timesteps + [*np.linspace(max_speed, 0, deaccel_timesteps)]
yaw_vel_cmds = [0] * zero_buf_timesteps + [0] * accel_timesteps + \
[yaw_rate] * duration_timesteps + [0] * deaccel_timesteps
self.commands[:len(x_vel_cmds), 0] = torch.Tensor(x_vel_cmds)
self.commands[:len(yaw_vel_cmds), 2] = torch.Tensor(yaw_vel_cmds)
class ElegantGaitProfile(CommandProfile):
def __init__(self, dt, filename):
import numpy as np
import json
with open(f'../command_profiles/{filename}', 'r') as file:
command_sequence = json.load(file)
len_command_sequence = len(command_sequence["x_vel_cmd"])
total_time_s = int(len_command_sequence / dt)
super().__init__(dt, total_time_s)
self.commands[:len_command_sequence, 0] = torch.Tensor(command_sequence["x_vel_cmd"])
self.commands[:len_command_sequence, 2] = torch.Tensor(command_sequence["yaw_vel_cmd"])
self.commands[:len_command_sequence, 3] = torch.Tensor(command_sequence["height_cmd"])
self.commands[:len_command_sequence, 4] = torch.Tensor(command_sequence["frequency_cmd"])
self.commands[:len_command_sequence, 5] = torch.Tensor(command_sequence["offset_cmd"])
self.commands[:len_command_sequence, 6] = torch.Tensor(command_sequence["phase_cmd"])
self.commands[:len_command_sequence, 7] = torch.Tensor(command_sequence["bound_cmd"])
self.commands[:len_command_sequence, 8] = torch.Tensor(command_sequence["duration_cmd"])
class RCControllerProfile(CommandProfile):
def __init__(self, dt, state_estimator, x_scale=1.0, y_scale=1.0, yaw_scale=1.0, probe_vel_multiplier=1.0):
super().__init__(dt)
self.state_estimator = state_estimator
self.x_scale = x_scale
self.y_scale = y_scale
self.yaw_scale = yaw_scale
self.probe_vel_multiplier = probe_vel_multiplier
self.triggered_commands = {i: None for i in range(4)} # command profiles for each action button on the controller
self.currently_triggered = [0, 0, 0, 0]
self.button_states = [0, 0, 0, 0]
def get_command(self, t, probe=False):
command = self.state_estimator.get_command()
command[0] = command[0] * self.x_scale
command[1] = command[1] * self.y_scale
command[2] = command[2] * self.yaw_scale
reset_timer = False
if probe:
command[0] = command[0] * self.probe_vel_multiplier
command[2] = command[2] * self.probe_vel_multiplier
# check for action buttons
prev_button_states = self.button_states[:]
self.button_states = self.state_estimator.get_buttons()
for button in range(4):
if self.triggered_commands[button] is not None:
if self.button_states[button] == 1 and prev_button_states[button] == 0:
if not self.currently_triggered[button]:
# reset the triggered action
self.triggered_commands[button].reset(t)
# reset the internal timing variable
reset_timer = True
self.currently_triggered[button] = True
else:
self.currently_triggered[button] = False
# execute the triggered action
if self.currently_triggered[button] and t < self.triggered_commands[button].max_timestep:
command = self.triggered_commands[button].get_command(t)
return command, reset_timer
def add_triggered_command(self, button_idx, command_profile):
self.triggered_commands[button_idx] = command_profile
def get_buttons(self):
return self.state_estimator.get_buttons()
class RCControllerProfileAccel(RCControllerProfile):
def __init__(self, dt, state_estimator, x_scale=1.0, y_scale=1.0, yaw_scale=1.0):
super().__init__(dt, state_estimator, x_scale=x_scale, y_scale=y_scale, yaw_scale=yaw_scale)
self.x_scale, self.y_scale, self.yaw_scale = self.x_scale / 100., self.y_scale / 100., self.yaw_scale / 100.
self.velocity_command = torch.zeros(3)
def get_command(self, t):
accel_command = self.state_estimator.get_command()
self.velocity_command[0] = self.velocity_command[0] + accel_command[0] * self.x_scale
self.velocity_command[1] = self.velocity_command[1] + accel_command[1] * self.y_scale
self.velocity_command[2] = self.velocity_command[2] + accel_command[2] * self.yaw_scale
# check for action buttons
prev_button_states = self.button_states[:]
self.button_states = self.state_estimator.get_buttons()
for button in range(4):
if self.button_states[button] == 1 and self.triggered_commands[button] is not None:
if prev_button_states[button] == 0:
# reset the triggered action
self.triggered_commands[button].reset(t)
# execute the triggered action
return self.triggered_commands[button].get_command(t)
return self.velocity_command[:]
def add_triggered_command(self, button_idx, command_profile):
self.triggered_commands[button_idx] = command_profile
def get_buttons(self):
return self.state_estimator.get_buttons()
class KeyboardProfile(CommandProfile):
# for control via keyboard inputs to isaac gym visualizer
def __init__(self, dt, isaac_env, x_scale=1.0, y_scale=1.0, yaw_scale=1.0):
super().__init__(dt)
from isaacgym.gymapi import KeyboardInput
self.gym = isaac_env.gym
self.viewer = isaac_env.viewer
self.x_scale = x_scale
self.y_scale = y_scale
self.yaw_scale = yaw_scale
self.gym.subscribe_viewer_keyboard_event(self.viewer, KeyboardInput.KEY_UP, "FORWARD")
self.gym.subscribe_viewer_keyboard_event(self.viewer, KeyboardInput.KEY_DOWN, "REVERSE")
self.gym.subscribe_viewer_keyboard_event(self.viewer, KeyboardInput.KEY_LEFT, "LEFT")
self.gym.subscribe_viewer_keyboard_event(self.viewer, KeyboardInput.KEY_RIGHT, "RIGHT")
self.keyb_command = [0, 0, 0]
self.command = [0, 0, 0]
def get_command(self, t):
events = self.gym.query_viewer_action_events(self.viewer)
events_dict = {event.action: event.value for event in events}
print(events_dict)
if "FORWARD" in events_dict and events_dict["FORWARD"] == 1.0: self.keyb_command[0] = 1.0
if "FORWARD" in events_dict and events_dict["FORWARD"] == 0.0: self.keyb_command[0] = 0.0
if "REVERSE" in events_dict and events_dict["REVERSE"] == 1.0: self.keyb_command[0] = -1.0
if "REVERSE" in events_dict and events_dict["REVERSE"] == 0.0: self.keyb_command[0] = 0.0
if "LEFT" in events_dict and events_dict["LEFT"] == 1.0: self.keyb_command[1] = 1.0
if "LEFT" in events_dict and events_dict["LEFT"] == 0.0: self.keyb_command[1] = 0.0
if "RIGHT" in events_dict and events_dict["RIGHT"] == 1.0: self.keyb_command[1] = -1.0
if "RIGHT" in events_dict and events_dict["RIGHT"] == 0.0: self.keyb_command[1] = 0.0
self.command[0] = self.keyb_command[0] * self.x_scale
self.command[1] = self.keyb_command[2] * self.y_scale
self.command[2] = self.keyb_command[1] * self.yaw_scale
print(self.command)
return self.command
if __name__ == "__main__":
cmdprof = ConstantAccelerationProfile(dt=0.2, max_speed=4, accel_time=3)
print(cmdprof.commands)
print(cmdprof.get_command(2))

View File

@ -1,222 +0,0 @@
import copy
import time
import os
import numpy as np
import torch
from go1_gym_deploy.utils.logger import MultiLogger
class DeploymentRunner:
def __init__(self, experiment_name="unnamed", se=None, log_root="."):
self.agents = {}
self.policy = None
self.command_profile = None
self.logger = MultiLogger()
self.se = se
self.vision_server = None
self.log_root = log_root
self.init_log_filename()
self.control_agent_name = None
self.command_agent_name = None
self.triggered_commands = {i: None for i in range(4)} # command profiles for each action button on the controller
self.button_states = np.zeros(4)
self.is_currently_probing = False
self.is_currently_logging = [False, False, False, False]
def init_log_filename(self):
datetime = time.strftime("%Y/%m_%d/%H_%M_%S")
for i in range(100):
try:
os.makedirs(f"{self.log_root}/{datetime}_{i}")
self.log_filename = f"{self.log_root}/{datetime}_{i}/log.pkl"
return
except FileExistsError:
continue
def add_open_loop_agent(self, agent, name):
self.agents[name] = agent
self.logger.add_robot(name, agent.env.cfg)
def add_control_agent(self, agent, name):
self.control_agent_name = name
self.agents[name] = agent
self.logger.add_robot(name, agent.env.cfg)
def add_vision_server(self, vision_server):
self.vision_server = vision_server
def set_command_agents(self, name):
self.command_agent = name
def add_policy(self, policy):
self.policy = policy
def add_command_profile(self, command_profile):
self.command_profile = command_profile
def calibrate(self, wait=True, low=False):
# first, if the robot is not in nominal pose, move slowly to the nominal pose
for agent_name in self.agents.keys():
if hasattr(self.agents[agent_name], "get_obs"):
agent = self.agents[agent_name]
agent.get_obs()
joint_pos = agent.dof_pos
if low:
final_goal = np.array([0., 0.3, -0.7,
0., 0.3, -0.7,
0., 0.3, -0.7,
0., 0.3, -0.7,])
else:
final_goal = np.zeros(12)
nominal_joint_pos = agent.default_dof_pos
print(f"About to calibrate; the robot will stand [Press R2 to calibrate]")
while wait:
self.button_states = self.command_profile.get_buttons()
if self.command_profile.state_estimator.right_lower_right_switch_pressed:
self.command_profile.state_estimator.right_lower_right_switch_pressed = False
break
cal_action = np.zeros((agent.num_envs, agent.num_actions))
target_sequence = []
target = joint_pos - nominal_joint_pos
while np.max(np.abs(target - final_goal)) > 0.01:
target -= np.clip((target - final_goal), -0.05, 0.05)
target_sequence += [copy.deepcopy(target)]
for target in target_sequence:
next_target = target
if isinstance(agent.cfg, dict):
hip_reduction = agent.cfg["control"]["hip_scale_reduction"]
action_scale = agent.cfg["control"]["action_scale"]
else:
hip_reduction = agent.cfg.control.hip_scale_reduction
action_scale = agent.cfg.control.action_scale
next_target[[0, 3, 6, 9]] /= hip_reduction
next_target = next_target / action_scale
cal_action[:, 0:12] = next_target
agent.step(torch.from_numpy(cal_action))
agent.get_obs()
time.sleep(0.05)
print("Starting pose calibrated [Press R2 to start controller]")
while True:
self.button_states = self.command_profile.get_buttons()
if self.command_profile.state_estimator.right_lower_right_switch_pressed:
self.command_profile.state_estimator.right_lower_right_switch_pressed = False
break
for agent_name in self.agents.keys():
obs = self.agents[agent_name].reset()
if agent_name == self.control_agent_name:
control_obs = obs
return control_obs
def run(self, num_log_steps=1000000000, max_steps=100000000, logging=True):
assert self.control_agent_name is not None, "cannot deploy, runner has no control agent!"
assert self.policy is not None, "cannot deploy, runner has no policy!"
assert self.command_profile is not None, "cannot deploy, runner has no command profile!"
# TODO: add basic test for comms
for agent_name in self.agents.keys():
obs = self.agents[agent_name].reset()
if agent_name == self.control_agent_name:
control_obs = obs
control_obs = self.calibrate(wait=True)
# now, run control loop
try:
for i in range(max_steps):
policy_info = {}
action = self.policy(control_obs, policy_info)
for agent_name in self.agents.keys():
obs, ret, done, info = self.agents[agent_name].step(action)
info.update(policy_info)
info.update({"observation": obs, "reward": ret, "done": done, "timestep": i,
"time": i * self.agents[self.control_agent_name].dt, "action": action, "rpy": self.agents[self.control_agent_name].se.get_rpy(), "torques": self.agents[self.control_agent_name].torques})
if logging: self.logger.log(agent_name, info)
if agent_name == self.control_agent_name:
control_obs, control_ret, control_done, control_info = obs, ret, done, info
# bad orientation emergency stop
rpy = self.agents[self.control_agent_name].se.get_rpy()
if abs(rpy[0]) > 1.6 or abs(rpy[1]) > 1.6:
self.calibrate(wait=False, low=True)
# check for logging command
prev_button_states = self.button_states[:]
self.button_states = self.command_profile.get_buttons()
if self.command_profile.state_estimator.left_lower_left_switch_pressed:
if not self.is_currently_probing:
print("START LOGGING")
self.is_currently_probing = True
self.agents[self.control_agent_name].set_probing(True)
self.init_log_filename()
self.logger.reset()
else:
print("SAVE LOG")
self.is_currently_probing = False
self.agents[self.control_agent_name].set_probing(False)
# calibrate, log, and then resume control
control_obs = self.calibrate(wait=False)
self.logger.save(self.log_filename)
self.init_log_filename()
self.logger.reset()
time.sleep(1)
control_obs = self.agents[self.control_agent_name].reset()
self.command_profile.state_estimator.left_lower_left_switch_pressed = False
for button in range(4):
if self.command_profile.currently_triggered[button]:
if not self.is_currently_logging[button]:
print("START LOGGING")
self.is_currently_logging[button] = True
self.init_log_filename()
self.logger.reset()
else:
if self.is_currently_logging[button]:
print("SAVE LOG")
self.is_currently_logging[button] = False
# calibrate, log, and then resume control
control_obs = self.calibrate(wait=False)
self.logger.save(self.log_filename)
self.init_log_filename()
self.logger.reset()
time.sleep(1)
control_obs = self.agents[self.control_agent_name].reset()
if self.command_profile.state_estimator.right_lower_right_switch_pressed:
control_obs = self.calibrate(wait=False)
time.sleep(1)
self.command_profile.state_estimator.right_lower_right_switch_pressed = False
# self.button_states = self.command_profile.get_buttons()
while not self.command_profile.state_estimator.right_lower_right_switch_pressed:
time.sleep(0.01)
# self.button_states = self.command_profile.get_buttons()
self.command_profile.state_estimator.right_lower_right_switch_pressed = False
# finally, return to the nominal pose
control_obs = self.calibrate(wait=False)
self.logger.save(self.log_filename)
except KeyboardInterrupt:
self.logger.save(self.log_filename)

View File

@ -1,79 +0,0 @@
import copy
import pickle as pkl
import numpy as np
import torch
def class_to_dict(obj) -> dict:
if not hasattr(obj, "__dict__"):
return obj
result = {}
for key in dir(obj):
if key.startswith("_") or key == "terrain":
continue
element = []
val = getattr(obj, key)
if isinstance(val, list):
for item in val:
element.append(class_to_dict(item))
else:
print(key)
element = class_to_dict(val)
result[key] = element
return result
class MultiLogger:
def __init__(self):
self.loggers = {}
def add_robot(self, name, cfg):
print(name, cfg)
self.loggers[name] = EpisodeLogger(cfg)
def log(self, name, info):
self.loggers[name].log(info)
def save(self, filename):
with open(filename, 'wb') as file:
logdict = {}
for key in self.loggers.keys():
logdict[key] = [class_to_dict(self.loggers[key].cfg), self.loggers[key].infos]
pkl.dump(logdict, file)
print(f"Saved log! Number of timesteps: {[len(self.loggers[key].infos) for key in self.loggers.keys()]}; Path: {filename}")
def read_metric(self, metric, robot_name=None):
if robot_name is None:
robot_name = list(self.loggers.keys())[0]
logger = self.loggers[robot_name]
metric_arr = []
for info in logger.infos:
metric_arr += [info[metric]]
return np.array(metric_arr)
def reset(self):
for key, log in self.loggers.items():
log.reset()
class EpisodeLogger:
def __init__(self, cfg):
self.infos = []
self.cfg = cfg
def log(self, info):
for key in info.keys():
if isinstance(info[key], torch.Tensor):
info[key] = info[key].detach().cpu().numpy()
if isinstance(info[key], dict):
continue
elif "image" not in key:
info[key] = copy.deepcopy(info[key])
self.infos += [dict(info)]
def reset(self):
self.infos = []

View File

@ -1,69 +0,0 @@
#!/usr/bin/env python3
from os.path import expanduser
import netifaces
import sys
import subprocess
def get_saved_interface_name():
home = expanduser("~")
name = ""
try:
with open(home + "/.cheetah_network.txt"):
name = f.read().split()[0]
except:
name = ""
return name
def get_likely_iface():
ifs = netifaces.interfaces()
print("Found {} interfaces:".format(len(ifs)))
if_to_addrs = {}
for i in ifs:
if_to_addrs[i] = []
if netifaces.AF_INET in netifaces.ifaddresses(i).keys():
for ad in netifaces.ifaddresses(i)[netifaces.AF_INET]:
if_to_addrs[i].append(ad['addr'])
for i in range(len(ifs)):
print(" [{}] : {} : {}".format(i, ifs[i], if_to_addrs[ifs[i]]))
found_10_ip = 0
selected_if = ""
for i in ifs:
match_string = "192.168.123."
if len(if_to_addrs[i]) > 0 and if_to_addrs[i][0][:len(match_string)] == match_string:
found_10_ip = found_10_ip + 1
selected_if = i
if found_10_ip == 0:
print("None of the network adapters look correct. Make sure you have set a 10.0.0.x static ip!")
return ""
elif found_10_ip == 1:
print("The adapter {} seems correct".format(selected_if))
return selected_if
else:
print("Found {} possible adapters, giving up".format(found_10_ip))
return ""
def main():
name = get_saved_interface_name()
if not name:
print("Didn't find saved interface, searching...")
name = get_likely_iface()
if not name:
sys.exit("Failed to find network adapter name")
else:
print("Found saved interface {}".format(name))
print("Setup for interface {}".format(name))
subprocess.call(['sudo', 'ifconfig', name, 'multicast'])
subprocess.call(['sudo', 'route', 'add', '-net', '224.0.0.0', 'netmask', '240.0.0.0', 'dev', name])
if __name__ == "__main__":
main()