autopep8 is applied

This commit is contained in:
Rooholla-Khorrambakht 2024-05-20 18:45:59 -04:00
parent 4269a7217f
commit 23ee23fd65
41 changed files with 1408 additions and 778 deletions

View File

@ -5,6 +5,7 @@ import time
import numpy as np
import torch
def loadParameters(path):
with open(path + "/parameters.pkl", "rb") as file:
pkl_cfg = pkl.load(file)
@ -130,7 +131,8 @@ class CommandInterface:
self.stance_width_cmd = 0.25
def setGaitType(self, gait_type):
assert gait_type in [key for key in self.gaits.keys()], f'The gain type should be in {[key for key in self.gaits.keys()]}'
assert gait_type in [key for key in self.gaits.keys()], f'The gain type should be in {
[key for key in self.gaits.keys()]}'
self.gait = torch.tensor(self.gaits[gait_type])
def get_command(self):
@ -403,9 +405,9 @@ class WalkTheseWaysAgent:
self.joint_vel_target - self.dof_vel
) * self.d_gains
# self.lcm_bridge.sendCommands(command_for_robot)
self.robot.setCommands(self.joint_pos_target[self.policy_to_unitree_map[:,1]],\
self.joint_vel_target[self.policy_to_unitree_map[:,1]],\
self.p_gains[self.policy_to_unitree_map[:,1]],\
self.robot.setCommands(self.joint_pos_target[self.policy_to_unitree_map[:, 1]],
self.joint_vel_target[self.policy_to_unitree_map[:, 1]],
self.p_gains[self.policy_to_unitree_map[:, 1]],
self.d_gains[self.policy_to_unitree_map[:, 1]],
np.zeros(12))

View File

@ -1,5 +1,6 @@
import numpy as np
class HysteresisContactDetector:
def __init__(self, upper_limit, lower_limit):
self.upper_limit = upper_limit

View File

@ -15,6 +15,7 @@ from pygame.locals import (
K_q,
)
@dataclass
class xKeySwitch:
R1: int
@ -45,12 +46,14 @@ class xRockerBtn:
L2: float
ly: float
class PyGameJoyManager:
def __init__(self, user_callback=None):
self.dt = 0.01 # Sampling frequence of the joystick
self.running = False
self.joy_thread_handle = threading.Thread(target=self.joy_thread)
# an optional callback that can be used to send the reading values to a user provided function
# an optional callback that can be used to send the reading values to a
# user provided function
self.user_callback = user_callback
pygame.init()
self.offset = None

View File

@ -17,9 +17,13 @@ class FSM:
self.tracking_kv = np.array(12 * [3.])
self.damping_kv = np.array(12 * [2.])
self.tracking_complete = True
self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12))
self.robot.setCommands(
np.zeros(12),
np.zeros(12),
np.zeros(12),
self.damping_kv,
np.zeros(12))
self.fsm_dT = 1. / 50.
self.control_dT = 1. / 50.
self.dT = self.fsm_dT
@ -49,19 +53,33 @@ class FSM:
self.q_target = target
self.time = 0.0
self.move_duration = duration
self.q_des = lambda t: [self.q_start + np.clip((t)/self.move_duration,0, 1)*(self.q_target - self.q_start), \
True if np.clip((t)/self.move_duration,0, 1)==1 else False] # q_des(t), Movement finished
self.q_des = lambda t: [self.q_start +
np.clip((t) /
self.move_duration, 0, 1) *
(self.q_target -
self.q_start), True if np.clip((t) /
self.move_duration, 0, 1) == 1 else False] # q_des(t), Movement finished
self.tracking_complete = False
self.setMode("tracking")
def trackingControlUpdate(self):
self.time += self.dT
q_des, done = self.q_des(self.time)
self.robot.setCommands(q_des, np.zeros(12), self.tracking_kp, self.tracking_kv, np.zeros(12))
self.robot.setCommands(
q_des,
np.zeros(12),
self.tracking_kp,
self.tracking_kv,
np.zeros(12))
self.tracking_complete = done
def dampingControlUpdate(self):
self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12))
self.robot.setCommands(
np.zeros(12),
np.zeros(12),
np.zeros(12),
self.damping_kv,
np.zeros(12))
def userControlUpdate(self):
if self.user_controller_callback is not None:
@ -113,7 +131,12 @@ class FSM:
self.setMode("user")
self.dT = self.control_dT
self.state = "user_loop"
self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12))
self.robot.setCommands(
np.zeros(12),
np.zeros(12),
np.zeros(12),
np.zeros(12),
np.zeros(12))
elif self.remote.standUpDownSeq() or self.robot.overheat():
self.moveTo(self.robot.sitting_q, duration=1.5)
self.state = "sitting"

View File

@ -21,7 +21,6 @@ from scipy.spatial.transform import Rotation
from Go2Py.joy import xKeySwitch, xRockerBtn
class GO2Real():
def __init__(
self,

View File

@ -18,15 +18,17 @@ class Go2Model:
q_reordering_idx (np.ndarray): Index array for reordering the joint position vector.
ef_J_ (dict): A dictionary storing the Jacobians for the end-effector frames.
"""
def __init__(self):
"""
Initializes the Go2Model class by loading the URDF, setting up the robot model, and calculating initial dimensions.
"""
self.robot = pin.RobotWrapper.BuildFromURDF(urdf_path, urdf_root_path, pin.JointModelFreeFlyer())
self.robot = pin.RobotWrapper.BuildFromURDF(
urdf_path, urdf_root_path, pin.JointModelFreeFlyer())
self.data = self.robot.data
# Standing joint configuration in Unitree Joint order
self.ef_frames = ['FR_foot', 'FL_foot', 'RR_foot', 'RL_foot']
self.dq_reordering_idx = np.array([0, 1, 2, 3, 4, 5,\
self.dq_reordering_idx = np.array([0, 1, 2, 3, 4, 5,
9, 10, 11, 6, 7, 8, 15, 16, 17, 12, 13, 14])
self.q_reordering_idx = np.array([9, 10, 11, 6, 7, 8, 15, 16, 17, 12, 13, 14]) - 6
self.ef_Jb_ = {}
@ -49,11 +51,21 @@ class Go2Model:
q_neutral = np.asarray([0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
self.robot.framesForwardKinematics(q_neutral)
self.h = np.linalg.norm(self.robot.data.oMf[ID_FR_HAA].translation - self.robot.data.oMf[ID_RR_HAA].translation)
self.b = np.linalg.norm(self.robot.data.oMf[ID_FR_HAA].translation - self.robot.data.oMf[ID_FL_HAA].translation)
self.l1 = np.linalg.norm(self.robot.data.oMf[ID_FR_HAA].translation - self.robot.data.oMf[ID_FR_HFE].translation)
self.l2 = np.linalg.norm(self.robot.data.oMf[ID_FR_HFE].translation - self.robot.data.oMf[ID_FR_KFE].translation)
self.l3 = np.linalg.norm(self.robot.data.oMf[ID_FR_KFE].translation - self.robot.data.oMf[ID_FR_FOOT].translation)
self.h = np.linalg.norm(
self.robot.data.oMf[ID_FR_HAA].translation -
self.robot.data.oMf[ID_RR_HAA].translation)
self.b = np.linalg.norm(
self.robot.data.oMf[ID_FR_HAA].translation -
self.robot.data.oMf[ID_FL_HAA].translation)
self.l1 = np.linalg.norm(
self.robot.data.oMf[ID_FR_HAA].translation -
self.robot.data.oMf[ID_FR_HFE].translation)
self.l2 = np.linalg.norm(
self.robot.data.oMf[ID_FR_HFE].translation -
self.robot.data.oMf[ID_FR_KFE].translation)
self.l3 = np.linalg.norm(
self.robot.data.oMf[ID_FR_KFE].translation -
self.robot.data.oMf[ID_FR_FOOT].translation)
self.M_ = None
self.Minv_ = None
self.nle_ = None
@ -133,7 +145,7 @@ class Go2Model:
self.robot.framesForwardKinematics(q_)
ef_frames = ['base_link', 'FR_foot', 'FL_foot', 'RR_foot', 'RL_foot']
data = {}
return {frame:self.robot.data.oMf[self.robot.model.getFrameId(frame)].homogeneous \
return {frame: self.robot.data.oMf[self.robot.model.getFrameId(frame)].homogeneous
for frame in ef_frames}
def updateKinematics(self, q):
@ -147,8 +159,12 @@ class Go2Model:
self.robot.framesForwardKinematics(q)\
for ef_frame in self.ef_frames:
Jw = self.robot.getFrameJacobian(self.robot.model.getFrameId(ef_frame), pin.ReferenceFrame.LOCAL_WORLD_ALIGNED)
Jb = self.robot.getFrameJacobian(self.robot.model.getFrameId(ef_frame), pin.ReferenceFrame.LOCAL)
Jw = self.robot.getFrameJacobian(
self.robot.model.getFrameId(ef_frame),
pin.ReferenceFrame.LOCAL_WORLD_ALIGNED)
Jb = self.robot.getFrameJacobian(
self.robot.model.getFrameId(ef_frame),
pin.ReferenceFrame.LOCAL)
self.ef_Jw_[ef_frame] = Jw[:, self.dq_reordering_idx]
self.ef_Jb_[ef_frame] = Jb[:, self.dq_reordering_idx]
@ -165,8 +181,12 @@ class Go2Model:
self.robot.framesForwardKinematics(q_)\
for ef_frame in self.ef_frames:
Jw = self.robot.getFrameJacobian(self.robot.model.getFrameId(ef_frame), pin.ReferenceFrame.LOCAL_WORLD_ALIGNED)
Jb = self.robot.getFrameJacobian(self.robot.model.getFrameId(ef_frame), pin.ReferenceFrame.LOCAL)
Jw = self.robot.getFrameJacobian(
self.robot.model.getFrameId(ef_frame),
pin.ReferenceFrame.LOCAL_WORLD_ALIGNED)
Jb = self.robot.getFrameJacobian(
self.robot.model.getFrameId(ef_frame),
pin.ReferenceFrame.LOCAL)
self.ef_Jw_[ef_frame] = Jw[:, self.dq_reordering_idx]
self.ef_Jb_[ef_frame] = Jb[:, self.dq_reordering_idx]
@ -183,10 +203,11 @@ class Go2Model:
self.g_ = self.robot.gravity(q)[self.dq_reordering_idx]
self.M_ = self.robot.mass(q)[self.dq_reordering_idx, :]
self.M_ = self.M_[:, self.dq_reordering_idx]
self.Minv_ = pin.computeMinverse(self.robot.model, self.robot.data, q)[self.dq_reordering_idx,:]
self.Minv_ = pin.computeMinverse(
self.robot.model, self.robot.data, q)[
self.dq_reordering_idx, :]
self.Minv_ = self.Minv_[:, self.dq_reordering_idx]
def updateAll(q, dq):
"""
Updates the dynamic and kinematic parameters based on the given joint configurations and velocities.
@ -231,7 +252,9 @@ class Go2Model:
def getGroundReactionForce(self, tau_est, body_acceleration=None):
if body_acceleration is None:
grf = {key:np.linalg.pinv(self.ef_Jw_[key][:3,6:].T)@(tau_est.squeeze() - self.nle_[6:]) for key in self.ef_Jw_.keys()}
grf = {key: np.linalg.pinv(
self.ef_Jw_[key][:3, 6:].T) @ (tau_est.squeeze() - self.nle_[6:]) for key in self.ef_Jw_.keys()}
else:
raise NotImplementedError("Ground reaction force with body dynamics is not implemented")
raise NotImplementedError(
"Ground reaction force with body dynamics is not implemented")
return grf

View File

@ -1,19 +1,24 @@
import threading
from pynput import keyboard
class BaseRemote:
def __init__(self):
pass
def startSeq(self):
return False
def standUpDownSeq(self):
return False
def flushStates(self):
pass
remote = BaseRemote()
class KeyboardRemote(BaseRemote):
def __init__(self):
super().__init__()
@ -32,7 +37,6 @@ class KeyboardRemote(BaseRemote):
except AttributeError:
pass # Special keys (like space) will be handled here
def _on_release(self, key):
try:
if key.char == 's': # Start sequence
@ -62,6 +66,7 @@ class KeyboardRemote(BaseRemote):
self.stand_up_down_seq_flag = False
self.start_seq_flag = False
class UnitreeRemote(BaseRemote):
def __init__(self, robot):
self.robot = robot
@ -86,4 +91,3 @@ class UnitreeRemote(BaseRemote):
return True
else:
return False

View File

@ -1,4 +1,6 @@
import time
class SafetyHypervisor():
def __init__(self, robot):
self.robot = robot

View File

@ -2,6 +2,10 @@
import os
MINI_GYM_ROOT_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.realpath(__file__))), '..')
MINI_GYM_ROOT_DIR = os.path.join(
os.path.dirname(
os.path.dirname(
os.path.realpath(__file__))),
'..')
print(MINI_GYM_ROOT_DIR)
MINI_GYM_ENVS_DIR = os.path.join(MINI_GYM_ROOT_DIR, 'go_gym', 'envs')

View File

@ -25,7 +25,8 @@ class BaseTask(gym.Env):
sim_device_type, self.sim_device_id = gymutil.parse_device_str(self.sim_device)
self.headless = headless
# env device is GPU only if sim is on GPU and use_gpu_pipeline=True, otherwise returned tensors are copied to CPU by physX.
# env device is GPU only if sim is on GPU and use_gpu_pipeline=True,
# otherwise returned tensors are copied to CPU by physX.
if sim_device_type == 'cuda' and sim_params.use_gpu_pipeline:
self.device = self.sim_device
else:
@ -33,7 +34,7 @@ class BaseTask(gym.Env):
# graphics device for rendering, -1 for no rendering
self.graphics_device_id = self.sim_device_id
if self.headless == True:
if self.headless:
self.graphics_device_id = self.sim_device_id
self.num_obs = cfg.env.num_observations
@ -54,14 +55,21 @@ class BaseTask(gym.Env):
torch._C._jit_set_profiling_executor(False)
# allocate buffers
self.obs_buf = torch.zeros(self.num_envs, self.num_obs, device=self.device, dtype=torch.float)
self.obs_buf = torch.zeros(
self.num_envs,
self.num_obs,
device=self.device,
dtype=torch.float)
self.rew_buf = torch.zeros(self.num_envs, device=self.device, dtype=torch.float)
self.rew_buf_pos = torch.zeros(self.num_envs, device=self.device, dtype=torch.float)
self.rew_buf_neg = torch.zeros(self.num_envs, device=self.device, dtype=torch.float)
self.reset_buf = torch.ones(self.num_envs, device=self.device, dtype=torch.long)
self.episode_length_buf = torch.zeros(self.num_envs, device=self.device, dtype=torch.long)
self.time_out_buf = torch.zeros(self.num_envs, device=self.device, dtype=torch.bool)
self.privileged_obs_buf = torch.zeros(self.num_envs, self.num_privileged_obs, device=self.device,
self.privileged_obs_buf = torch.zeros(
self.num_envs,
self.num_privileged_obs,
device=self.device,
dtype=torch.float)
# self.num_privileged_obs = self.num_obs
@ -76,7 +84,7 @@ class BaseTask(gym.Env):
self.viewer = None
# if running with a viewer, set up keyboard shortcuts and camera
if self.headless == False:
if not self.headless:
# subscribe to keyboard shortcuts
self.viewer = self.gym.create_viewer(
self.sim, gymapi.CameraProperties())
@ -132,6 +140,6 @@ class BaseTask(gym.Env):
self.gym.poll_viewer_events(self.viewer)
def close(self):
if self.headless == False:
if not self.headless:
self.gym.destroy_viewer(self.viewer)
self.gym.destroy_sim(self.sim)

View File

@ -32,14 +32,18 @@ class Curriculum:
self.indices = indices = {}
for key, v_range in key_ranges.items():
bin_size = (v_range[1] - v_range[0]) / v_range[2]
cfg[key] = np.linspace(v_range[0] + bin_size / 2, v_range[1] - bin_size / 2, v_range[2])
cfg[key] = np.linspace(
v_range[0] + bin_size / 2,
v_range[1] - bin_size / 2,
v_range[2])
indices[key] = np.linspace(0, v_range[2] - 1, v_range[2])
self.lows = np.array([range[0] for range in key_ranges.values()])
self.highs = np.array([range[1] for range in key_ranges.values()])
# self.bin_sizes = {key: arr[1] - arr[0] for key, arr in cfg.items()}
self.bin_sizes = {key: (v_range[1] - v_range[0]) / v_range[2] for key, v_range in key_ranges.items()}
self.bin_sizes = {key: (v_range[1] - v_range[0]) / v_range[2]
for key, v_range in key_ranges.items()}
self._raw_grid = np.stack(np.meshgrid(*cfg.values(), indexing='ij'))
self._idx_grid = np.stack(np.meshgrid(*indices.values(), indexing='ij'))
@ -145,7 +149,8 @@ class RewardThresholdCurriculum(Curriculum):
# if len(is_success) > 0 and is_success.any():
# print("successes")
self.weights[bin_inds[is_success]] = np.clip(self.weights[bin_inds[is_success]] + 0.2, 0, 1)
self.weights[bin_inds[is_success]] = np.clip(
self.weights[bin_inds[is_success]] + 0.2, 0, 1)
adjacents = self.get_local_bins(bin_inds[is_success], ranges=local_range)
for adjacent in adjacents:
# print(adjacent)
@ -158,6 +163,7 @@ class RewardThresholdCurriculum(Curriculum):
self.episode_ang_vel_raw[bin_inds] = ang_vel_raw.cpu().numpy()
self.episode_duration[bin_inds] = episode_duration.cpu().numpy()
if __name__ == '__main__':
r = RewardThresholdCurriculum(100, x=(-1, 1, 5), y=(-1, 1, 2), z=(-1, 1, 11))
@ -171,8 +177,12 @@ if __name__ == '__main__':
for adjacent in adjacents:
adjacent_inds = np.array(adjacent.nonzero()[0])
print(adjacent_inds)
r.update(bin_inds=adjacent_inds, lin_vel_rewards=np.ones_like(adjacent_inds),
ang_vel_rewards=np.ones_like(adjacent_inds), lin_vel_threshold=0.0, ang_vel_threshold=0.0,
r.update(
bin_inds=adjacent_inds,
lin_vel_rewards=np.ones_like(adjacent_inds),
ang_vel_rewards=np.ones_like(adjacent_inds),
lin_vel_threshold=0.0,
ang_vel_threshold=0.0,
local_range=0.5)
samples, bins = r.sample(10_000)

File diff suppressed because it is too large Load Diff

View File

@ -8,7 +8,8 @@ class Cfg(PrefixProto, cli=False):
num_envs = 4096
num_observations = 235
num_scalar_observations = 42
# if not None a privilige_obs_buf will be returned by step() (critic obs for assymetric training). None is returned otherwise
# if not None a privilige_obs_buf will be returned by step() (critic obs
# for assymetric training). None is returned otherwise
num_privileged_obs = 18
privileged_future_horizon = 1
num_actions = 12
@ -75,7 +76,8 @@ class Cfg(PrefixProto, cli=False):
terrain_smoothness = 0.005
measure_heights = True
# 1mx1.6m rectangle (without center line)
measured_points_x = [-0.8, -0.7, -0.6, -0.5, -0.4, -0.3, -0.2, -0.1, 0., 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]
measured_points_x = [-0.8, -0.7, -0.6, -0.5, -0.4, -0.3, -
0.2, -0.1, 0., 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]
measured_points_y = [-0.5, -0.4, -0.3, -0.2, -0.1, 0., 0.1, 0.2, 0.3, 0.4, 0.5]
selected = False # select a unique terrain type and pass all arguments
terrain_kwargs = None # Dict of arguments for selected terrain
@ -223,10 +225,12 @@ class Cfg(PrefixProto, cli=False):
penalize_contacts_on = []
terminate_after_contacts_on = []
disable_gravity = False
# merge bodies connected by fixed joints. Specific fixed joints can be kept by adding " <... dont_collapse="true">
# merge bodies connected by fixed joints. Specific fixed joints can be
# kept by adding " <... dont_collapse="true">
collapse_fixed_joints = True
fix_base_link = False # fixe the base of the robot
default_dof_drive_mode = 3 # see GymDofDriveModeFlags (0 is none, 1 is pos tgt, 2 is vel tgt, 3 effort)
# see GymDofDriveModeFlags (0 is none, 1 is pos tgt, 2 is vel tgt, 3 effort)
default_dof_drive_mode = 3
self_collisions = 0 # 1 to disable, 0 to enable...bitwise filter
# replace collision cylinders with capsules, leads to faster/more stable simulation
replace_cylinder_with_capsule = True
@ -270,7 +274,8 @@ class Cfg(PrefixProto, cli=False):
lag_timesteps = 6
class rewards(PrefixProto, cli=False):
only_positive_rewards = True # if true negative total rewards are clipped at zero (avoids early termination problems)
# if true negative total rewards are clipped at zero (avoids early termination problems)
only_positive_rewards = True
only_positive_rewards_ji22_style = False
sigma_rew_neg = 5
reward_container_name = "CoRLRewards"

View File

@ -8,22 +8,37 @@ from Go2Py.sim.gym.envs.base.legged_robot_config import Cfg
class VelocityTrackingEasyEnv(LeggedRobot):
def __init__(self, sim_device, headless, num_envs=None, prone=False, deploy=False,
cfg: Cfg = None, eval_cfg: Cfg = None, initial_dynamics_dict=None, physics_engine="SIM_PHYSX"):
def __init__(
self,
sim_device,
headless,
num_envs=None,
prone=False,
deploy=False,
cfg: Cfg = None,
eval_cfg: Cfg = None,
initial_dynamics_dict=None,
physics_engine="SIM_PHYSX"):
if num_envs is not None:
cfg.env.num_envs = num_envs
sim_params = gymapi.SimParams()
gymutil.parse_sim_config(vars(cfg.sim), sim_params)
super().__init__(cfg, sim_params, physics_engine, sim_device, headless, eval_cfg, initial_dynamics_dict)
super().__init__(
cfg,
sim_params,
physics_engine,
sim_device,
headless,
eval_cfg,
initial_dynamics_dict)
def step(self, actions):
self.obs_buf, self.privileged_obs_buf, self.rew_buf, self.reset_buf, self.extras = super().step(actions)
self.foot_positions = self.rigid_body_state.view(self.num_envs, self.num_bodies, 13)[:, self.feet_indices,
0:3]
self.foot_positions = self.rigid_body_state.view(self.num_envs, self.num_bodies, 13)[
:, self.feet_indices, 0:3]
self.extras.update({
"privileged_obs": self.privileged_obs_buf,
@ -45,6 +60,6 @@ class VelocityTrackingEasyEnv(LeggedRobot):
def reset(self):
self.reset_idx(torch.arange(self.num_envs, device=self.device))
obs, _, _, _ = self.step(torch.zeros(self.num_envs, self.num_actions, device=self.device, requires_grad=False))
obs, _, _, _ = self.step(torch.zeros(self.num_envs, self.num_actions,
device=self.device, requires_grad=False))
return obs

View File

@ -4,6 +4,7 @@ from Go2Py.sim.gym.utils.math_utils import quat_apply_yaw, wrap_to_pi, get_scale
from isaacgym.torch_utils import *
from isaacgym import gymapi
class CoRLRewards:
def __init__(self, env):
self.env = env
@ -14,7 +15,8 @@ class CoRLRewards:
# ------------ reward functions----------------
def _reward_tracking_lin_vel(self):
# Tracking of linear velocity commands (xy axes)
lin_vel_error = torch.sum(torch.square(self.env.commands[:, :2] - self.env.base_lin_vel[:, :2]), dim=1)
lin_vel_error = torch.sum(torch.square(
self.env.commands[:, :2] - self.env.base_lin_vel[:, :2]), dim=1)
return torch.exp(-lin_vel_error / self.env.cfg.rewards.tracking_sigma)
def _reward_tracking_ang_vel(self):
@ -40,7 +42,12 @@ class CoRLRewards:
def _reward_dof_acc(self):
# Penalize dof accelerations
return torch.sum(torch.square((self.env.last_dof_vel - self.env.dof_vel) / self.env.dt), dim=1)
return torch.sum(
torch.square(
(self.env.last_dof_vel -
self.env.dof_vel) /
self.env.dt),
dim=1)
def _reward_action_rate(self):
# Penalize changes in actions
@ -48,12 +55,13 @@ class CoRLRewards:
def _reward_collision(self):
# Penalize collisions on selected bodies
return torch.sum(1. * (torch.norm(self.env.contact_forces[:, self.env.penalised_contact_indices, :], dim=-1) > 0.1),
dim=1)
return torch.sum(
1. * (torch.norm(self.env.contact_forces[:, self.env.penalised_contact_indices, :], dim=-1) > 0.1), dim=1)
def _reward_dof_pos_limits(self):
# Penalize dof positions too close to the limit
out_of_limits = -(self.env.dof_pos - self.env.dof_pos_limits[:, 0]).clip(max=0.) # lower limit
out_of_limits = -(self.env.dof_pos -
self.env.dof_pos_limits[:, 0]).clip(max=0.) # lower limit
out_of_limits += (self.env.dof_pos - self.env.dof_pos_limits[:, 1]).clip(min=0.)
return torch.sum(out_of_limits, dim=1)
@ -79,8 +87,9 @@ class CoRLRewards:
desired_contact = self.env.desired_contact_states
reward = 0
for i in range(4):
reward += - (desired_contact[:, i] * (
1 - torch.exp(-1 * foot_velocities[:, i] ** 2 / self.env.cfg.rewards.gait_vel_sigma)))
reward += - (desired_contact[:,
i] * (1 - torch.exp(-1 * foot_velocities[:,
i] ** 2 / self.env.cfg.rewards.gait_vel_sigma)))
return reward / 4
def _reward_dof_pos(self):
@ -93,13 +102,18 @@ class CoRLRewards:
def _reward_action_smoothness_1(self):
# Penalize changes in actions
diff = torch.square(self.env.joint_pos_target[:, :self.env.num_actuated_dof] - self.env.last_joint_pos_target[:, :self.env.num_actuated_dof])
diff = torch.square(self.env.joint_pos_target[:,
:self.env.num_actuated_dof] - self.env.last_joint_pos_target[:,
:self.env.num_actuated_dof])
diff = diff * (self.env.last_actions[:, :self.env.num_dof] != 0) # ignore first step
return torch.sum(diff, dim=1)
def _reward_action_smoothness_2(self):
# Penalize changes in actions
diff = torch.square(self.env.joint_pos_target[:, :self.env.num_actuated_dof] - 2 * self.env.last_joint_pos_target[:, :self.env.num_actuated_dof] + self.env.last_last_joint_pos_target[:, :self.env.num_actuated_dof])
diff = torch.square(self.env.joint_pos_target[:,
:self.env.num_actuated_dof] - 2 * self.env.last_joint_pos_target[:,
:self.env.num_actuated_dof] + self.env.last_last_joint_pos_target[:,
:self.env.num_actuated_dof])
diff = diff * (self.env.last_actions[:, :self.env.num_dof] != 0) # ignore first step
diff = diff * (self.env.last_last_actions[:, :self.env.num_dof] != 0) # ignore second step
return torch.sum(diff, dim=1)
@ -108,55 +122,66 @@ class CoRLRewards:
contact = self.env.contact_forces[:, self.env.feet_indices, 2] > 1.
contact_filt = torch.logical_or(contact, self.env.last_contacts)
self.env.last_contacts = contact
foot_velocities = torch.square(torch.norm(self.env.foot_velocities[:, :, 0:2], dim=2).view(self.env.num_envs, -1))
foot_velocities = torch.square(torch.norm(
self.env.foot_velocities[:, :, 0:2], dim=2).view(self.env.num_envs, -1))
rew_slip = torch.sum(contact_filt * foot_velocities, dim=1)
return rew_slip
def _reward_feet_contact_vel(self):
reference_heights = 0
near_ground = self.env.foot_positions[:, :, 2] - reference_heights < 0.03
foot_velocities = torch.square(torch.norm(self.env.foot_velocities[:, :, 0:3], dim=2).view(self.env.num_envs, -1))
foot_velocities = torch.square(torch.norm(
self.env.foot_velocities[:, :, 0:3], dim=2).view(self.env.num_envs, -1))
rew_contact_vel = torch.sum(near_ground * foot_velocities, dim=1)
return rew_contact_vel
def _reward_feet_contact_forces(self):
# penalize high contact forces
return torch.sum((torch.norm(self.env.contact_forces[:, self.env.feet_indices, :],
dim=-1) - self.env.cfg.rewards.max_contact_force).clip(min=0.), dim=1)
return torch.sum((torch.norm(self.env.contact_forces[:,
self.env.feet_indices,
:],
dim=-1) - self.env.cfg.rewards.max_contact_force).clip(min=0.),
dim=1)
def _reward_feet_clearance_cmd_linear(self):
phases = 1 - torch.abs(1.0 - torch.clip((self.env.foot_indices * 2.0) - 1.0, 0.0, 1.0) * 2.0)
foot_height = (self.env.foot_positions[:, :, 2]).view(self.env.num_envs, -1)# - reference_heights
target_height = self.env.commands[:, 9].unsqueeze(1) * phases + 0.02 # offset for foot radius 2cm
rew_foot_clearance = torch.square(target_height - foot_height) * (1 - self.env.desired_contact_states)
phases = 1 - \
torch.abs(1.0 - torch.clip((self.env.foot_indices * 2.0) - 1.0, 0.0, 1.0) * 2.0)
foot_height = (self.env.foot_positions[:, :, 2]).view(
self.env.num_envs, -1) # - reference_heights
target_height = self.env.commands[:, 9].unsqueeze(
1) * phases + 0.02 # offset for foot radius 2cm
rew_foot_clearance = torch.square(
target_height - foot_height) * (1 - self.env.desired_contact_states)
return torch.sum(rew_foot_clearance, dim=1)
def _reward_feet_impact_vel(self):
prev_foot_velocities = self.env.prev_foot_velocities[:, :, 2].view(self.env.num_envs, -1)
contact_states = torch.norm(self.env.contact_forces[:, self.env.feet_indices, :], dim=-1) > 1.0
contact_states = torch.norm(
self.env.contact_forces[:, self.env.feet_indices, :], dim=-1) > 1.0
rew_foot_impact_vel = contact_states * torch.square(torch.clip(prev_foot_velocities, -100, 0))
rew_foot_impact_vel = contact_states * \
torch.square(torch.clip(prev_foot_velocities, -100, 0))
return torch.sum(rew_foot_impact_vel, dim=1)
def _reward_collision(self):
# Penalize collisions on selected bodies
return torch.sum(1. * (torch.norm(self.env.contact_forces[:, self.env.penalised_contact_indices, :], dim=-1) > 0.1),
dim=1)
return torch.sum(
1. * (torch.norm(self.env.contact_forces[:, self.env.penalised_contact_indices, :], dim=-1) > 0.1), dim=1)
def _reward_orientation_control(self):
# Penalize non flat base orientation
roll_pitch_commands = self.env.commands[:, 10:12]
quat_roll = quat_from_angle_axis(-roll_pitch_commands[:, 1],
torch.tensor([1, 0, 0], device=self.env.device, dtype=torch.float))
quat_pitch = quat_from_angle_axis(-roll_pitch_commands[:, 0],
torch.tensor([0, 1, 0], device=self.env.device, dtype=torch.float))
quat_roll = quat_from_angle_axis(-roll_pitch_commands[:, 1], torch.tensor(
[1, 0, 0], device=self.env.device, dtype=torch.float))
quat_pitch = quat_from_angle_axis(-roll_pitch_commands[:, 0], torch.tensor(
[0, 1, 0], device=self.env.device, dtype=torch.float))
desired_base_quat = quat_mul(quat_roll, quat_pitch)
desired_projected_gravity = quat_rotate_inverse(desired_base_quat, self.env.gravity_vec)
return torch.sum(torch.square(self.env.projected_gravity[:, :2] - desired_projected_gravity[:, :2]), dim=1)
return torch.sum(torch.square(
self.env.projected_gravity[:, :2] - desired_projected_gravity[:, :2]), dim=1)
def _reward_raibert_heuristic(self):
cur_footsteps_translated = self.env.foot_positions - self.env.base_pos.unsqueeze(1)
@ -168,17 +193,41 @@ class CoRLRewards:
# nominal positions: [FR, FL, RR, RL]
if self.env.cfg.commands.num_commands >= 13:
desired_stance_width = self.env.commands[:, 12:13]
desired_ys_nom = torch.cat([desired_stance_width / 2, -desired_stance_width / 2, desired_stance_width / 2, -desired_stance_width / 2], dim=1)
desired_ys_nom = torch.cat([desired_stance_width /
2, -
desired_stance_width /
2, desired_stance_width /
2, -
desired_stance_width /
2], dim=1)
else:
desired_stance_width = 0.3
desired_ys_nom = torch.tensor([desired_stance_width / 2, -desired_stance_width / 2, desired_stance_width / 2, -desired_stance_width / 2], device=self.env.device).unsqueeze(0)
desired_ys_nom = torch.tensor([desired_stance_width /
2, -
desired_stance_width /
2, desired_stance_width /
2, -
desired_stance_width /
2], device=self.env.device).unsqueeze(0)
if self.env.cfg.commands.num_commands >= 14:
desired_stance_length = self.env.commands[:, 13:14]
desired_xs_nom = torch.cat([desired_stance_length / 2, desired_stance_length / 2, -desired_stance_length / 2, -desired_stance_length / 2], dim=1)
desired_xs_nom = torch.cat([desired_stance_length /
2, desired_stance_length /
2, -
desired_stance_length /
2, -
desired_stance_length /
2], dim=1)
else:
desired_stance_length = 0.45
desired_xs_nom = torch.tensor([desired_stance_length / 2, desired_stance_length / 2, -desired_stance_length / 2, -desired_stance_length / 2], device=self.env.device).unsqueeze(0)
desired_xs_nom = torch.tensor([desired_stance_length /
2, desired_stance_length /
2, -
desired_stance_length /
2, -
desired_stance_length /
2], device=self.env.device).unsqueeze(0)
# raibert offsets
phases = torch.abs(1.0 - (self.env.foot_indices * 2.0)) * 1.0 - 0.5
@ -193,9 +242,11 @@ class CoRLRewards:
desired_ys_nom = desired_ys_nom + desired_ys_offset
desired_xs_nom = desired_xs_nom + desired_xs_offset
desired_footsteps_body_frame = torch.cat((desired_xs_nom.unsqueeze(2), desired_ys_nom.unsqueeze(2)), dim=2)
desired_footsteps_body_frame = torch.cat(
(desired_xs_nom.unsqueeze(2), desired_ys_nom.unsqueeze(2)), dim=2)
err_raibert_heuristic = torch.abs(desired_footsteps_body_frame - footsteps_in_body_frame[:, :, 0:2])
err_raibert_heuristic = torch.abs(
desired_footsteps_body_frame - footsteps_in_body_frame[:, :, 0:2])
reward = torch.sum(torch.square(err_raibert_heuristic), dim=(1, 2))

View File

@ -1,7 +1,8 @@
import gym
import torch
import isaacgym
assert isaacgym
import torch
import gym
class HistoryWrapper(gym.Wrapper):
def __init__(self, env):
@ -21,7 +22,8 @@ class HistoryWrapper(gym.Wrapper):
privileged_obs = info["privileged_obs"]
self.obs_history = torch.cat((self.obs_history[:, self.env.num_obs:], obs), dim=-1)
return {'obs': obs, 'privileged_obs': privileged_obs, 'obs_history': self.obs_history}, rew, done, info
return {'obs': obs, 'privileged_obs': privileged_obs,
'obs_history': self.obs_history}, rew, done, info
def get_observations(self):
obs = self.env.get_observations()

View File

@ -30,10 +30,8 @@ class Terrain:
self.heightsamples = self.height_field_raw
if self.type == "trimesh":
self.vertices, self.triangles = terrain_utils.convert_heightfield_to_trimesh(self.height_field_raw,
self.cfg.horizontal_scale,
self.cfg.vertical_scale,
self.cfg.slope_treshold)
self.vertices, self.triangles = terrain_utils.convert_heightfield_to_trimesh(
self.height_field_raw, self.cfg.horizontal_scale, self.cfg.vertical_scale, self.cfg.slope_treshold)
def load_cfgs(self):
self._load_cfg(self.cfg)
@ -45,14 +43,16 @@ class Terrain:
return self.cfg.row_indices, self.cfg.col_indices, [], []
else:
self._load_cfg(self.eval_cfg)
self.eval_cfg.row_indices = np.arange(self.cfg.tot_rows, self.cfg.tot_rows + self.eval_cfg.tot_rows)
self.eval_cfg.row_indices = np.arange(
self.cfg.tot_rows, self.cfg.tot_rows + self.eval_cfg.tot_rows)
self.eval_cfg.col_indices = np.arange(0, self.eval_cfg.tot_cols)
self.eval_cfg.x_offset = self.cfg.tot_rows
self.eval_cfg.rows_offset = self.cfg.num_rows
return self.cfg.row_indices, self.cfg.col_indices, self.eval_cfg.row_indices, self.eval_cfg.col_indices
def _load_cfg(self, cfg):
cfg.proportions = [np.sum(cfg.terrain_proportions[:i + 1]) for i in range(len(cfg.terrain_proportions))]
cfg.proportions = [np.sum(cfg.terrain_proportions[:i + 1])
for i in range(len(cfg.terrain_proportions))]
cfg.num_sub_terrains = cfg.num_rows * cfg.num_cols
cfg.env_origins = np.zeros((cfg.num_rows, cfg.num_cols, 3))
@ -128,32 +128,54 @@ class Terrain:
terrain_utils.pyramid_sloped_terrain(terrain, slope=slope, platform_size=3.)
elif choice < proportions[1]:
terrain_utils.pyramid_sloped_terrain(terrain, slope=slope, platform_size=3.)
terrain_utils.random_uniform_terrain(terrain, min_height=-0.05, max_height=0.05,
step=self.cfg.terrain_smoothness, downsampled_scale=0.2)
terrain_utils.random_uniform_terrain(
terrain,
min_height=-0.05,
max_height=0.05,
step=self.cfg.terrain_smoothness,
downsampled_scale=0.2)
elif choice < proportions[3]:
if choice < proportions[2]:
step_height *= -1
terrain_utils.pyramid_stairs_terrain(terrain, step_width=0.31, step_height=step_height, platform_size=3.)
terrain_utils.pyramid_stairs_terrain(
terrain, step_width=0.31, step_height=step_height, platform_size=3.)
elif choice < proportions[4]:
num_rectangles = 20
rectangle_min_size = 1.
rectangle_max_size = 2.
terrain_utils.discrete_obstacles_terrain(terrain, discrete_obstacles_height, rectangle_min_size,
rectangle_max_size, num_rectangles, platform_size=3.)
terrain_utils.discrete_obstacles_terrain(
terrain,
discrete_obstacles_height,
rectangle_min_size,
rectangle_max_size,
num_rectangles,
platform_size=3.)
elif choice < proportions[5]:
terrain_utils.stepping_stones_terrain(terrain, stone_size=stepping_stones_size,
stone_distance=stone_distance, max_height=0., platform_size=4.)
terrain_utils.stepping_stones_terrain(
terrain,
stone_size=stepping_stones_size,
stone_distance=stone_distance,
max_height=0.,
platform_size=4.)
elif choice < proportions[6]:
pass
elif choice < proportions[7]:
pass
elif choice < proportions[8]:
terrain_utils.random_uniform_terrain(terrain, min_height=-cfg.terrain_noise_magnitude,
max_height=cfg.terrain_noise_magnitude, step=0.005,
terrain_utils.random_uniform_terrain(
terrain,
min_height=-
cfg.terrain_noise_magnitude,
max_height=cfg.terrain_noise_magnitude,
step=0.005,
downsampled_scale=0.2)
elif choice < proportions[9]:
terrain_utils.random_uniform_terrain(terrain, min_height=-0.05, max_height=0.05,
step=self.cfg.terrain_smoothness, downsampled_scale=0.2)
terrain_utils.random_uniform_terrain(
terrain,
min_height=-0.05,
max_height=0.05,
step=self.cfg.terrain_smoothness,
downsampled_scale=0.2)
terrain.height_field_raw[0:terrain.length // 2, :] = 0
return terrain
@ -174,7 +196,7 @@ class Terrain:
x2 = int((cfg.terrain_length / 2. + 1) / terrain.horizontal_scale) + cfg.x_offset
y1 = int((cfg.terrain_width / 2. - 1) / terrain.horizontal_scale)
y2 = int((cfg.terrain_width / 2. + 1) / terrain.horizontal_scale)
env_origin_z = np.max(self.height_field_raw[start_x: end_x, start_y:end_y]) * terrain.vertical_scale
env_origin_z = np.max(
self.height_field_raw[start_x: end_x, start_y:end_y]) * terrain.vertical_scale
cfg.env_origins[i, j] = [env_origin_x, env_origin_y, env_origin_z]

View File

@ -1,5 +1,18 @@
# launch Isaac Sim before any other imports
# default first two lines in any standalone application
from Go2Py.sim.utils import (
NumpyMemMapDataPipe,
load_config,
simulationManager,
)
from Go2Py.sim.isaacsim.utils import AnnotatorManager
from Go2Py.sim.isaacsim.go2 import UnitreeGo2
from Go2Py.sim.isaacsim.lcm_types.unitree_lowlevel import UnitreeLowCommand
import Go2Py
from omni.isaac.sensor import RotatingLidarPhysX
from omni.isaac.core.utils.prims import define_prim, get_prim_at_path
from omni.isaac.core.utils.nucleus import get_assets_root_path
from omni.isaac.core import World
import pickle
import sys
import time
@ -9,20 +22,7 @@ from omni.isaac.kit import SimulationApp
simulation_app = SimulationApp({"headless": False}) # we can also run as headless.
# import cv2
from omni.isaac.core import World
from omni.isaac.core.utils.nucleus import get_assets_root_path
from omni.isaac.core.utils.prims import define_prim, get_prim_at_path
from omni.isaac.sensor import RotatingLidarPhysX
import Go2Py
from Go2Py.sim.isaacsim.lcm_types.unitree_lowlevel import UnitreeLowCommand
from Go2Py.sim.isaacsim.go2 import UnitreeGo2
from Go2Py.sim.isaacsim.utils import AnnotatorManager
from Go2Py.sim.utils import (
NumpyMemMapDataPipe,
load_config,
simulationManager,
)
cfg = load_config(Go2Py.GO2_ISAACSIM_CFG_PATH)
robots = cfg["robots"]

View File

@ -9,6 +9,8 @@ from pinocchio.robot_wrapper import RobotWrapper
from Go2Py import ASSETS_PATH
import os
from scipy.spatial.transform import Rotation
class Go2Sim:
def __init__(self, render=True, dt=0.002):
@ -104,7 +106,7 @@ class Go2Sim:
def getIMU(self):
return {
'accel':np.array(self.data.sensordata[0:3]),\
'accel': np.array(self.data.sensordata[0:3]),
'gyro': np.array(self.data.sensordata[3:6])
}

View File

@ -10,6 +10,7 @@ def load_config(file_path):
config = yaml.safe_load(file)
return config
class NumpyMemMapDataPipe:
def __init__(self, channel_name, force=False, dtype="uint8", shape=(640, 480, 3)):
self.channel_name = channel_name

View File

@ -7,6 +7,7 @@ from launch.substitutions import Command, FindExecutable, LaunchConfiguration
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description():
# go2_xacro_file = os.path.join(
# get_package_share_directory("go2_description"), "xacro", "robot_virtual_arm.xacro"

View File

@ -7,6 +7,7 @@ from launch.substitutions import Command, FindExecutable, LaunchConfiguration
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description():
go2_xacro_file = os.path.join(
get_package_share_directory("go2_description"), "xacro", "robot.xacro"

View File

@ -1,6 +1,8 @@
from launch import LaunchDescription
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
rviz_config = get_package_share_directory('hesai_ros_driver') + '/rviz/rviz2.rviz'
return LaunchDescription([

View File

@ -2,11 +2,22 @@ from launch import LaunchDescription
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
rviz_config = get_package_share_directory('hesai_ros_driver') + '/rviz/rviz2.rviz'
return LaunchDescription([
Node(namespace='hesai_ros_driver', package='hesai_ros_driver', executable='hesai_ros_driver_node', output='screen'),
Node(namespace='rviz2', package='rviz2', executable='rviz2', arguments=['-d',rviz_config])
])
return LaunchDescription(
[
Node(
namespace='hesai_ros_driver',
package='hesai_ros_driver',
executable='hesai_ros_driver_node',
output='screen'),
Node(
namespace='rviz2',
package='rviz2',
executable='rviz2',
arguments=[
'-d',
rviz_config])])

View File

@ -23,6 +23,7 @@ import pathlib
import launch.actions
from launch.actions import DeclareLaunchArgument
def generate_launch_description():
return LaunchDescription([
launch_ros.actions.Node(

View File

@ -51,5 +51,6 @@ def main():
robot.close()
exit()
if __name__ == '__main__':
main()

View File

@ -25,5 +25,3 @@ class Go2pyHighCmd_(idl.IdlStruct, typename="go2py_messages.msg.dds_.Go2pyHighCm
vx: types.float32
vy: types.float32
vz: types.float32

View File

@ -27,5 +27,3 @@ class Go2pyLowCmd_(idl.IdlStruct, typename="go2py_messages.msg.dds_.Go2pyLowCmd_
kp: types.array[types.float32, 12]
kd: types.array[types.float32, 12]
tau: types.array[types.float32, 12]

View File

@ -35,5 +35,3 @@ class Go2pyState_(idl.IdlStruct, typename="go2py_messages.msg.dds_.Go2pyState_")
motor_temp: types.array[types.float32, 12]
wireless_remote: types.array[types.uint8, 40]
soc: types.uint8