first
This commit is contained in:
commit
25877c7eaf
|
@ -0,0 +1,23 @@
|
|||
BSD 3-Clause License
|
||||
Copyright (c) 2016-2023 HangZhou YuShu TECHNOLOGY CO.,LTD. ("Unitree Robotics")
|
||||
All rights reserved.
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
1. Redistributions of source code must retain the above copyright notice, this
|
||||
list of conditions and the following disclaimer.
|
||||
2. Redistributions in binary form must reproduce the above copyright notice,
|
||||
this list of conditions and the following disclaimer in the documentation
|
||||
and/or other materials provided with the distribution.
|
||||
3. Neither the name of the copyright holder nor the names of its
|
||||
contributors may be used to endorse or promote products derived from
|
||||
this software without specific prior written permission.
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
@ -0,0 +1 @@
|
|||
详细使用说明,请参考 https://support.unitree.com/home/zh/developer/rl_example。
|
|
@ -0,0 +1,31 @@
|
|||
Copyright (c) 2021, ETH Zurich, Nikita Rudin
|
||||
Copyright (c) 2021, NVIDIA CORPORATION & AFFILIATES
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without modification,
|
||||
are permitted provided that the following conditions are met:
|
||||
|
||||
1. Redistributions of source code must retain the above copyright notice,
|
||||
this list of conditions and the following disclaimer.
|
||||
|
||||
2. Redistributions in binary form must reproduce the above copyright notice,
|
||||
this list of conditions and the following disclaimer in the documentation
|
||||
and/or other materials provided with the distribution.
|
||||
|
||||
3. Neither the name of the copyright holder nor the names of its contributors
|
||||
may be used to endorse or promote products derived from this software without
|
||||
specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
||||
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
|
||||
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
||||
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
||||
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
|
||||
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
||||
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
See licenses/assets for license information for assets included in this repository.
|
||||
See licenses/dependencies for license information of dependencies of this package.
|
|
@ -0,0 +1,4 @@
|
|||
import os
|
||||
|
||||
LEGGED_GYM_ROOT_DIR = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
|
||||
LEGGED_GYM_ENVS_DIR = os.path.join(LEGGED_GYM_ROOT_DIR, 'legged_gym', 'envs')
|
|
@ -0,0 +1,9 @@
|
|||
|
||||
from legged_gym import LEGGED_GYM_ROOT_DIR, LEGGED_GYM_ENVS_DIR
|
||||
|
||||
from legged_gym.envs.go2.go2_config import GO2RoughCfg, GO2RoughCfgPPO
|
||||
from .base.legged_robot import LeggedRobot
|
||||
|
||||
from legged_gym.utils.task_registry import task_registry
|
||||
task_registry.register( "go2", LeggedRobot, GO2RoughCfg(), GO2RoughCfgPPO() )
|
||||
|
|
@ -0,0 +1,25 @@
|
|||
import inspect
|
||||
|
||||
class BaseConfig:
|
||||
def __init__(self) -> None:
|
||||
""" Initializes all member classes recursively. Ignores all namse starting with '__' (buit-in methods)."""
|
||||
self.init_member_classes(self)
|
||||
|
||||
@staticmethod
|
||||
def init_member_classes(obj):
|
||||
# iterate over all attributes names
|
||||
for key in dir(obj):
|
||||
# disregard builtin attributes
|
||||
# if key.startswith("__"):
|
||||
if key=="__class__":
|
||||
continue
|
||||
# get the corresponding attribute object
|
||||
var = getattr(obj, key)
|
||||
# check if it the attribute is a class
|
||||
if inspect.isclass(var):
|
||||
# instantate the class
|
||||
i_var = var()
|
||||
# set the attribute to the instance instead of the type
|
||||
setattr(obj, key, i_var)
|
||||
# recursively init members of the attribute
|
||||
BaseConfig.init_member_classes(i_var)
|
|
@ -0,0 +1,115 @@
|
|||
|
||||
import sys
|
||||
from isaacgym import gymapi
|
||||
from isaacgym import gymutil
|
||||
import numpy as np
|
||||
import torch
|
||||
|
||||
# Base class for RL tasks
|
||||
class BaseTask():
|
||||
|
||||
def __init__(self, cfg, sim_params, physics_engine, sim_device, headless):
|
||||
self.gym = gymapi.acquire_gym()
|
||||
|
||||
self.sim_params = sim_params
|
||||
self.physics_engine = physics_engine
|
||||
self.sim_device = sim_device
|
||||
sim_device_type, self.sim_device_id = gymutil.parse_device_str(self.sim_device)
|
||||
self.headless = headless
|
||||
|
||||
# env device is GPU only if sim is on GPU and use_gpu_pipeline=True, otherwise returned tensors are copied to CPU by physX.
|
||||
if sim_device_type=='cuda' and sim_params.use_gpu_pipeline:
|
||||
self.device = self.sim_device
|
||||
else:
|
||||
self.device = 'cpu'
|
||||
|
||||
# graphics device for rendering, -1 for no rendering
|
||||
self.graphics_device_id = self.sim_device_id
|
||||
if self.headless == True:
|
||||
self.graphics_device_id = -1
|
||||
|
||||
self.num_envs = cfg.env.num_envs
|
||||
self.num_obs = cfg.env.num_observations
|
||||
self.num_privileged_obs = cfg.env.num_privileged_obs
|
||||
self.num_actions = cfg.env.num_actions
|
||||
|
||||
# optimization flags for pytorch JIT
|
||||
torch._C._jit_set_profiling_mode(False)
|
||||
torch._C._jit_set_profiling_executor(False)
|
||||
|
||||
# allocate buffers
|
||||
self.obs_buf = torch.zeros(self.num_envs, self.num_obs, device=self.device, dtype=torch.float)
|
||||
self.rew_buf = torch.zeros(self.num_envs, device=self.device, dtype=torch.float)
|
||||
self.reset_buf = torch.ones(self.num_envs, device=self.device, dtype=torch.long)
|
||||
self.episode_length_buf = torch.zeros(self.num_envs, device=self.device, dtype=torch.long)
|
||||
self.time_out_buf = torch.zeros(self.num_envs, device=self.device, dtype=torch.bool)
|
||||
if self.num_privileged_obs is not None:
|
||||
self.privileged_obs_buf = torch.zeros(self.num_envs, self.num_privileged_obs, device=self.device, dtype=torch.float)
|
||||
else:
|
||||
self.privileged_obs_buf = None
|
||||
# self.num_privileged_obs = self.num_obs
|
||||
|
||||
self.extras = {}
|
||||
|
||||
# create envs, sim and viewer
|
||||
self.create_sim()
|
||||
self.gym.prepare_sim(self.sim)
|
||||
|
||||
# todo: read from config
|
||||
self.enable_viewer_sync = True
|
||||
self.viewer = None
|
||||
|
||||
# if running with a viewer, set up keyboard shortcuts and camera
|
||||
if self.headless == False:
|
||||
# subscribe to keyboard shortcuts
|
||||
self.viewer = self.gym.create_viewer(
|
||||
self.sim, gymapi.CameraProperties())
|
||||
self.gym.subscribe_viewer_keyboard_event(
|
||||
self.viewer, gymapi.KEY_ESCAPE, "QUIT")
|
||||
self.gym.subscribe_viewer_keyboard_event(
|
||||
self.viewer, gymapi.KEY_V, "toggle_viewer_sync")
|
||||
|
||||
def get_observations(self):
|
||||
return self.obs_buf
|
||||
|
||||
def get_privileged_observations(self):
|
||||
return self.privileged_obs_buf
|
||||
|
||||
def reset_idx(self, env_ids):
|
||||
"""Reset selected robots"""
|
||||
raise NotImplementedError
|
||||
|
||||
def reset(self):
|
||||
""" Reset all robots"""
|
||||
self.reset_idx(torch.arange(self.num_envs, device=self.device))
|
||||
obs, privileged_obs, _, _, _ = self.step(torch.zeros(self.num_envs, self.num_actions, device=self.device, requires_grad=False))
|
||||
return obs, privileged_obs
|
||||
|
||||
def step(self, actions):
|
||||
raise NotImplementedError
|
||||
|
||||
def render(self, sync_frame_time=True):
|
||||
if self.viewer:
|
||||
# check for window closed
|
||||
if self.gym.query_viewer_has_closed(self.viewer):
|
||||
sys.exit()
|
||||
|
||||
# check for keyboard events
|
||||
for evt in self.gym.query_viewer_action_events(self.viewer):
|
||||
if evt.action == "QUIT" and evt.value > 0:
|
||||
sys.exit()
|
||||
elif evt.action == "toggle_viewer_sync" and evt.value > 0:
|
||||
self.enable_viewer_sync = not self.enable_viewer_sync
|
||||
|
||||
# fetch results
|
||||
if self.device != 'cpu':
|
||||
self.gym.fetch_results(self.sim, True)
|
||||
|
||||
# step graphics
|
||||
if self.enable_viewer_sync:
|
||||
self.gym.step_graphics(self.sim)
|
||||
self.gym.draw_viewer(self.viewer, self.sim, True)
|
||||
if sync_frame_time:
|
||||
self.gym.sync_frame_time(self.sim)
|
||||
else:
|
||||
self.gym.poll_viewer_events(self.viewer)
|
|
@ -0,0 +1,707 @@
|
|||
from legged_gym import LEGGED_GYM_ROOT_DIR, envs
|
||||
from time import time
|
||||
from warnings import WarningMessage
|
||||
import numpy as np
|
||||
import os
|
||||
|
||||
from isaacgym.torch_utils import *
|
||||
from isaacgym import gymtorch, gymapi, gymutil
|
||||
|
||||
import torch
|
||||
from torch import Tensor
|
||||
from typing import Tuple, Dict
|
||||
|
||||
from legged_gym import LEGGED_GYM_ROOT_DIR
|
||||
from legged_gym.envs.base.base_task import BaseTask
|
||||
from legged_gym.utils.math import quat_apply_yaw, wrap_to_pi, torch_rand_sqrt_float
|
||||
from legged_gym.utils.helpers import class_to_dict
|
||||
from .legged_robot_config import LeggedRobotCfg
|
||||
|
||||
class LeggedRobot(BaseTask):
|
||||
def __init__(self, cfg: LeggedRobotCfg, sim_params, physics_engine, sim_device, headless):
|
||||
""" Parses the provided config file,
|
||||
calls create_sim() (which creates, simulation and environments),
|
||||
initilizes pytorch buffers used during training
|
||||
|
||||
Args:
|
||||
cfg (Dict): Environment config file
|
||||
sim_params (gymapi.SimParams): simulation parameters
|
||||
physics_engine (gymapi.SimType): gymapi.SIM_PHYSX (must be PhysX)
|
||||
device_type (string): 'cuda' or 'cpu'
|
||||
device_id (int): 0, 1, ...
|
||||
headless (bool): Run without rendering if True
|
||||
"""
|
||||
self.cfg = cfg
|
||||
self.sim_params = sim_params
|
||||
self.height_samples = None
|
||||
self.debug_viz = False
|
||||
self.init_done = False
|
||||
self._parse_cfg(self.cfg)
|
||||
super().__init__(self.cfg, sim_params, physics_engine, sim_device, headless)
|
||||
|
||||
if not self.headless:
|
||||
self.set_camera(self.cfg.viewer.pos, self.cfg.viewer.lookat)
|
||||
self._init_buffers()
|
||||
self._prepare_reward_function()
|
||||
self.init_done = True
|
||||
|
||||
def step(self, actions):
|
||||
""" Apply actions, simulate, call self.post_physics_step()
|
||||
|
||||
Args:
|
||||
actions (torch.Tensor): Tensor of shape (num_envs, num_actions_per_env)
|
||||
"""
|
||||
|
||||
clip_actions = self.cfg.normalization.clip_actions
|
||||
self.actions = torch.clip(actions, -clip_actions, clip_actions).to(self.device)
|
||||
# step physics and render each frame
|
||||
self.render()
|
||||
for _ in range(self.cfg.control.decimation):
|
||||
self.torques = self._compute_torques(self.actions).view(self.torques.shape)
|
||||
self.gym.set_dof_actuation_force_tensor(self.sim, gymtorch.unwrap_tensor(self.torques))
|
||||
self.gym.simulate(self.sim)
|
||||
if self.device == 'cpu':
|
||||
self.gym.fetch_results(self.sim, True)
|
||||
self.gym.refresh_dof_state_tensor(self.sim)
|
||||
self.post_physics_step()
|
||||
|
||||
# return clipped obs, clipped states (None), rewards, dones and infos
|
||||
clip_obs = self.cfg.normalization.clip_observations
|
||||
self.obs_buf = torch.clip(self.obs_buf, -clip_obs, clip_obs)
|
||||
if self.privileged_obs_buf is not None:
|
||||
self.privileged_obs_buf = torch.clip(self.privileged_obs_buf, -clip_obs, clip_obs)
|
||||
return self.obs_buf, self.privileged_obs_buf, self.rew_buf, self.reset_buf, self.extras
|
||||
|
||||
def post_physics_step(self):
|
||||
""" check terminations, compute observations and rewards
|
||||
calls self._post_physics_step_callback() for common computations
|
||||
calls self._draw_debug_vis() if needed
|
||||
"""
|
||||
self.gym.refresh_actor_root_state_tensor(self.sim)
|
||||
self.gym.refresh_net_contact_force_tensor(self.sim)
|
||||
|
||||
self.episode_length_buf += 1
|
||||
self.common_step_counter += 1
|
||||
|
||||
# prepare quantities
|
||||
self.base_pos[:] = self.root_states[:, 0:3]
|
||||
self.base_quat[:] = self.root_states[:, 3:7]
|
||||
self.base_lin_vel[:] = quat_rotate_inverse(self.base_quat, self.root_states[:, 7:10])
|
||||
self.base_ang_vel[:] = quat_rotate_inverse(self.base_quat, self.root_states[:, 10:13])
|
||||
self.projected_gravity[:] = quat_rotate_inverse(self.base_quat, self.gravity_vec)
|
||||
|
||||
self._post_physics_step_callback()
|
||||
|
||||
# compute observations, rewards, resets, ...
|
||||
self.check_termination()
|
||||
self.compute_reward()
|
||||
env_ids = self.reset_buf.nonzero(as_tuple=False).flatten()
|
||||
self.reset_idx(env_ids)
|
||||
self.compute_observations() # in some cases a simulation step might be required to refresh some obs (for example body positions)
|
||||
|
||||
self.last_actions[:] = self.actions[:]
|
||||
self.last_dof_vel[:] = self.dof_vel[:]
|
||||
self.last_root_vel[:] = self.root_states[:, 7:13]
|
||||
|
||||
def check_termination(self):
|
||||
""" Check if environments need to be reset
|
||||
"""
|
||||
self.reset_buf = torch.any(torch.norm(self.contact_forces[:, self.termination_contact_indices, :], dim=-1) > 1., dim=1)
|
||||
self.time_out_buf = self.episode_length_buf > self.max_episode_length # no terminal reward for time-outs
|
||||
self.reset_buf |= self.time_out_buf
|
||||
|
||||
def reset_idx(self, env_ids):
|
||||
""" Reset some environments.
|
||||
Calls self._reset_dofs(env_ids), self._reset_root_states(env_ids), and self._resample_commands(env_ids)
|
||||
[Optional] calls self._update_terrain_curriculum(env_ids), self.update_command_curriculum(env_ids) and
|
||||
Logs episode info
|
||||
Resets some buffers
|
||||
|
||||
Args:
|
||||
env_ids (list[int]): List of environment ids which must be reset
|
||||
"""
|
||||
if len(env_ids) == 0:
|
||||
return
|
||||
|
||||
# reset robot states
|
||||
self._reset_dofs(env_ids)
|
||||
self._reset_root_states(env_ids)
|
||||
|
||||
self._resample_commands(env_ids)
|
||||
|
||||
# reset buffers
|
||||
self.last_actions[env_ids] = 0.
|
||||
self.last_dof_vel[env_ids] = 0.
|
||||
self.feet_air_time[env_ids] = 0.
|
||||
self.episode_length_buf[env_ids] = 0
|
||||
self.reset_buf[env_ids] = 1
|
||||
# fill extras
|
||||
self.extras["episode"] = {}
|
||||
for key in self.episode_sums.keys():
|
||||
self.extras["episode"]['rew_' + key] = torch.mean(self.episode_sums[key][env_ids]) / self.max_episode_length_s
|
||||
self.episode_sums[key][env_ids] = 0.
|
||||
if self.cfg.commands.curriculum:
|
||||
self.extras["episode"]["max_command_x"] = self.command_ranges["lin_vel_x"][1]
|
||||
# send timeout info to the algorithm
|
||||
if self.cfg.env.send_timeouts:
|
||||
self.extras["time_outs"] = self.time_out_buf
|
||||
|
||||
def compute_reward(self):
|
||||
""" Compute rewards
|
||||
Calls each reward function which had a non-zero scale (processed in self._prepare_reward_function())
|
||||
adds each terms to the episode sums and to the total reward
|
||||
"""
|
||||
self.rew_buf[:] = 0.
|
||||
for i in range(len(self.reward_functions)):
|
||||
name = self.reward_names[i]
|
||||
rew = self.reward_functions[i]() * self.reward_scales[name]
|
||||
self.rew_buf += rew
|
||||
self.episode_sums[name] += rew
|
||||
if self.cfg.rewards.only_positive_rewards:
|
||||
self.rew_buf[:] = torch.clip(self.rew_buf[:], min=0.)
|
||||
# add termination reward after clipping
|
||||
if "termination" in self.reward_scales:
|
||||
rew = self._reward_termination() * self.reward_scales["termination"]
|
||||
self.rew_buf += rew
|
||||
self.episode_sums["termination"] += rew
|
||||
|
||||
def compute_observations(self):
|
||||
""" Computes observations
|
||||
"""
|
||||
self.obs_buf = torch.cat(( self.base_lin_vel * self.obs_scales.lin_vel,
|
||||
self.base_ang_vel * self.obs_scales.ang_vel,
|
||||
self.projected_gravity,
|
||||
self.commands[:, :3] * self.commands_scale,
|
||||
(self.dof_pos - self.default_dof_pos) * self.obs_scales.dof_pos,
|
||||
self.dof_vel * self.obs_scales.dof_vel,
|
||||
self.actions
|
||||
),dim=-1)
|
||||
# add perceptive inputs if not blind
|
||||
# add noise if needed
|
||||
if self.add_noise:
|
||||
self.obs_buf += (2 * torch.rand_like(self.obs_buf) - 1) * self.noise_scale_vec
|
||||
|
||||
def create_sim(self):
|
||||
""" Creates simulation, terrain and evironments
|
||||
"""
|
||||
self.up_axis_idx = 2 # 2 for z, 1 for y -> adapt gravity accordingly
|
||||
self.sim = self.gym.create_sim(self.sim_device_id, self.graphics_device_id, self.physics_engine, self.sim_params)
|
||||
self._create_ground_plane()
|
||||
self._create_envs()
|
||||
|
||||
def set_camera(self, position, lookat):
|
||||
""" Set camera position and direction
|
||||
"""
|
||||
cam_pos = gymapi.Vec3(position[0], position[1], position[2])
|
||||
cam_target = gymapi.Vec3(lookat[0], lookat[1], lookat[2])
|
||||
self.gym.viewer_camera_look_at(self.viewer, None, cam_pos, cam_target)
|
||||
|
||||
#------------- Callbacks --------------
|
||||
def _process_rigid_shape_props(self, props, env_id):
|
||||
""" Callback allowing to store/change/randomize the rigid shape properties of each environment.
|
||||
Called During environment creation.
|
||||
Base behavior: randomizes the friction of each environment
|
||||
|
||||
Args:
|
||||
props (List[gymapi.RigidShapeProperties]): Properties of each shape of the asset
|
||||
env_id (int): Environment id
|
||||
|
||||
Returns:
|
||||
[List[gymapi.RigidShapeProperties]]: Modified rigid shape properties
|
||||
"""
|
||||
if self.cfg.domain_rand.randomize_friction:
|
||||
if env_id==0:
|
||||
# prepare friction randomization
|
||||
friction_range = self.cfg.domain_rand.friction_range
|
||||
num_buckets = 64
|
||||
bucket_ids = torch.randint(0, num_buckets, (self.num_envs, 1))
|
||||
friction_buckets = torch_rand_float(friction_range[0], friction_range[1], (num_buckets,1), device='cpu')
|
||||
self.friction_coeffs = friction_buckets[bucket_ids]
|
||||
|
||||
for s in range(len(props)):
|
||||
props[s].friction = self.friction_coeffs[env_id]
|
||||
return props
|
||||
|
||||
def _process_dof_props(self, props, env_id):
|
||||
""" Callback allowing to store/change/randomize the DOF properties of each environment.
|
||||
Called During environment creation.
|
||||
Base behavior: stores position, velocity and torques limits defined in the URDF
|
||||
|
||||
Args:
|
||||
props (numpy.array): Properties of each DOF of the asset
|
||||
env_id (int): Environment id
|
||||
|
||||
Returns:
|
||||
[numpy.array]: Modified DOF properties
|
||||
"""
|
||||
if env_id==0:
|
||||
self.dof_pos_limits = torch.zeros(self.num_dof, 2, dtype=torch.float, device=self.device, requires_grad=False)
|
||||
self.dof_vel_limits = torch.zeros(self.num_dof, dtype=torch.float, device=self.device, requires_grad=False)
|
||||
self.torque_limits = torch.zeros(self.num_dof, dtype=torch.float, device=self.device, requires_grad=False)
|
||||
for i in range(len(props)):
|
||||
self.dof_pos_limits[i, 0] = props["lower"][i].item()
|
||||
self.dof_pos_limits[i, 1] = props["upper"][i].item()
|
||||
self.dof_vel_limits[i] = props["velocity"][i].item()
|
||||
self.torque_limits[i] = props["effort"][i].item()
|
||||
# soft limits
|
||||
m = (self.dof_pos_limits[i, 0] + self.dof_pos_limits[i, 1]) / 2
|
||||
r = self.dof_pos_limits[i, 1] - self.dof_pos_limits[i, 0]
|
||||
self.dof_pos_limits[i, 0] = m - 0.5 * r * self.cfg.rewards.soft_dof_pos_limit
|
||||
self.dof_pos_limits[i, 1] = m + 0.5 * r * self.cfg.rewards.soft_dof_pos_limit
|
||||
return props
|
||||
|
||||
def _process_rigid_body_props(self, props, env_id):
|
||||
# if env_id==0:
|
||||
# sum = 0
|
||||
# for i, p in enumerate(props):
|
||||
# sum += p.mass
|
||||
# print(f"Mass of body {i}: {p.mass} (before randomization)")
|
||||
# print(f"Total mass {sum} (before randomization)")
|
||||
# randomize base mass
|
||||
if self.cfg.domain_rand.randomize_base_mass:
|
||||
rng = self.cfg.domain_rand.added_mass_range
|
||||
props[0].mass += np.random.uniform(rng[0], rng[1])
|
||||
return props
|
||||
|
||||
def _post_physics_step_callback(self):
|
||||
""" Callback called before computing terminations, rewards, and observations
|
||||
Default behaviour: Compute ang vel command based on target and heading, compute measured terrain heights and randomly push robots
|
||||
"""
|
||||
#
|
||||
env_ids = (self.episode_length_buf % int(self.cfg.commands.resampling_time / self.dt)==0).nonzero(as_tuple=False).flatten()
|
||||
self._resample_commands(env_ids)
|
||||
if self.cfg.commands.heading_command:
|
||||
forward = quat_apply(self.base_quat, self.forward_vec)
|
||||
heading = torch.atan2(forward[:, 1], forward[:, 0])
|
||||
self.commands[:, 2] = torch.clip(0.5*wrap_to_pi(self.commands[:, 3] - heading), -1., 1.)
|
||||
|
||||
if self.cfg.domain_rand.push_robots and (self.common_step_counter % self.cfg.domain_rand.push_interval == 0):
|
||||
self._push_robots()
|
||||
|
||||
def _resample_commands(self, env_ids):
|
||||
""" Randommly select commands of some environments
|
||||
|
||||
Args:
|
||||
env_ids (List[int]): Environments ids for which new commands are needed
|
||||
"""
|
||||
self.commands[env_ids, 0] = torch_rand_float(self.command_ranges["lin_vel_x"][0], self.command_ranges["lin_vel_x"][1], (len(env_ids), 1), device=self.device).squeeze(1)
|
||||
self.commands[env_ids, 1] = torch_rand_float(self.command_ranges["lin_vel_y"][0], self.command_ranges["lin_vel_y"][1], (len(env_ids), 1), device=self.device).squeeze(1)
|
||||
if self.cfg.commands.heading_command:
|
||||
self.commands[env_ids, 3] = torch_rand_float(self.command_ranges["heading"][0], self.command_ranges["heading"][1], (len(env_ids), 1), device=self.device).squeeze(1)
|
||||
else:
|
||||
self.commands[env_ids, 2] = torch_rand_float(self.command_ranges["ang_vel_yaw"][0], self.command_ranges["ang_vel_yaw"][1], (len(env_ids), 1), device=self.device).squeeze(1)
|
||||
|
||||
# set small commands to zero
|
||||
self.commands[env_ids, :2] *= (torch.norm(self.commands[env_ids, :2], dim=1) > 0.2).unsqueeze(1)
|
||||
|
||||
def _compute_torques(self, actions):
|
||||
""" Compute torques from actions.
|
||||
Actions can be interpreted as position or velocity targets given to a PD controller, or directly as scaled torques.
|
||||
[NOTE]: torques must have the same dimension as the number of DOFs, even if some DOFs are not actuated.
|
||||
|
||||
Args:
|
||||
actions (torch.Tensor): Actions
|
||||
|
||||
Returns:
|
||||
[torch.Tensor]: Torques sent to the simulation
|
||||
"""
|
||||
#pd controller
|
||||
actions_scaled = actions * self.cfg.control.action_scale
|
||||
control_type = self.cfg.control.control_type
|
||||
if control_type=="P":
|
||||
torques = self.p_gains*(actions_scaled + self.default_dof_pos - self.dof_pos) - self.d_gains*self.dof_vel
|
||||
elif control_type=="V":
|
||||
torques = self.p_gains*(actions_scaled - self.dof_vel) - self.d_gains*(self.dof_vel - self.last_dof_vel)/self.sim_params.dt
|
||||
elif control_type=="T":
|
||||
torques = actions_scaled
|
||||
else:
|
||||
raise NameError(f"Unknown controller type: {control_type}")
|
||||
return torch.clip(torques, -self.torque_limits, self.torque_limits)
|
||||
|
||||
def _reset_dofs(self, env_ids):
|
||||
""" Resets DOF position and velocities of selected environmments
|
||||
Positions are randomly selected within 0.5:1.5 x default positions.
|
||||
Velocities are set to zero.
|
||||
|
||||
Args:
|
||||
env_ids (List[int]): Environemnt ids
|
||||
"""
|
||||
self.dof_pos[env_ids] = self.default_dof_pos * torch_rand_float(0.5, 1.5, (len(env_ids), self.num_dof), device=self.device)
|
||||
self.dof_vel[env_ids] = 0.
|
||||
|
||||
env_ids_int32 = env_ids.to(dtype=torch.int32)
|
||||
self.gym.set_dof_state_tensor_indexed(self.sim,
|
||||
gymtorch.unwrap_tensor(self.dof_state),
|
||||
gymtorch.unwrap_tensor(env_ids_int32), len(env_ids_int32))
|
||||
def _reset_root_states(self, env_ids):
|
||||
""" Resets ROOT states position and velocities of selected environmments
|
||||
Sets base position based on the curriculum
|
||||
Selects randomized base velocities within -0.5:0.5 [m/s, rad/s]
|
||||
Args:
|
||||
env_ids (List[int]): Environemnt ids
|
||||
"""
|
||||
# base position
|
||||
if self.custom_origins:
|
||||
self.root_states[env_ids] = self.base_init_state
|
||||
self.root_states[env_ids, :3] += self.env_origins[env_ids]
|
||||
self.root_states[env_ids, :2] += torch_rand_float(-1., 1., (len(env_ids), 2), device=self.device) # xy position within 1m of the center
|
||||
else:
|
||||
self.root_states[env_ids] = self.base_init_state
|
||||
self.root_states[env_ids, :3] += self.env_origins[env_ids]
|
||||
# base velocities
|
||||
self.root_states[env_ids, 7:13] = torch_rand_float(-0.5, 0.5, (len(env_ids), 6), device=self.device) # [7:10]: lin vel, [10:13]: ang vel
|
||||
env_ids_int32 = env_ids.to(dtype=torch.int32)
|
||||
self.gym.set_actor_root_state_tensor_indexed(self.sim,
|
||||
gymtorch.unwrap_tensor(self.root_states),
|
||||
gymtorch.unwrap_tensor(env_ids_int32), len(env_ids_int32))
|
||||
|
||||
def _push_robots(self):
|
||||
""" Random pushes the robots. Emulates an impulse by setting a randomized base velocity.
|
||||
"""
|
||||
max_vel = self.cfg.domain_rand.max_push_vel_xy
|
||||
self.root_states[:, 7:9] = torch_rand_float(-max_vel, max_vel, (self.num_envs, 2), device=self.device) # lin vel x/y
|
||||
self.gym.set_actor_root_state_tensor(self.sim, gymtorch.unwrap_tensor(self.root_states))
|
||||
|
||||
|
||||
|
||||
def update_command_curriculum(self, env_ids):
|
||||
""" Implements a curriculum of increasing commands
|
||||
|
||||
Args:
|
||||
env_ids (List[int]): ids of environments being reset
|
||||
"""
|
||||
# If the tracking reward is above 80% of the maximum, increase the range of commands
|
||||
if torch.mean(self.episode_sums["tracking_lin_vel"][env_ids]) / self.max_episode_length > 0.8 * self.reward_scales["tracking_lin_vel"]:
|
||||
self.command_ranges["lin_vel_x"][0] = np.clip(self.command_ranges["lin_vel_x"][0] - 0.5, -self.cfg.commands.max_curriculum, 0.)
|
||||
self.command_ranges["lin_vel_x"][1] = np.clip(self.command_ranges["lin_vel_x"][1] + 0.5, 0., self.cfg.commands.max_curriculum)
|
||||
|
||||
|
||||
def _get_noise_scale_vec(self, cfg):
|
||||
""" Sets a vector used to scale the noise added to the observations.
|
||||
[NOTE]: Must be adapted when changing the observations structure
|
||||
|
||||
Args:
|
||||
cfg (Dict): Environment config file
|
||||
|
||||
Returns:
|
||||
[torch.Tensor]: Vector of scales used to multiply a uniform distribution in [-1, 1]
|
||||
"""
|
||||
noise_vec = torch.zeros_like(self.obs_buf[0])
|
||||
self.add_noise = self.cfg.noise.add_noise
|
||||
noise_scales = self.cfg.noise.noise_scales
|
||||
noise_level = self.cfg.noise.noise_level
|
||||
noise_vec[:3] = noise_scales.lin_vel * noise_level * self.obs_scales.lin_vel
|
||||
noise_vec[3:6] = noise_scales.ang_vel * noise_level * self.obs_scales.ang_vel
|
||||
noise_vec[6:9] = noise_scales.gravity * noise_level
|
||||
noise_vec[9:12] = 0. # commands
|
||||
noise_vec[12:24] = noise_scales.dof_pos * noise_level * self.obs_scales.dof_pos
|
||||
noise_vec[24:36] = noise_scales.dof_vel * noise_level * self.obs_scales.dof_vel
|
||||
noise_vec[36:48] = 0. # previous actions
|
||||
|
||||
return noise_vec
|
||||
|
||||
#----------------------------------------
|
||||
def _init_buffers(self):
|
||||
""" Initialize torch tensors which will contain simulation states and processed quantities
|
||||
"""
|
||||
# get gym GPU state tensors
|
||||
actor_root_state = self.gym.acquire_actor_root_state_tensor(self.sim)
|
||||
dof_state_tensor = self.gym.acquire_dof_state_tensor(self.sim)
|
||||
net_contact_forces = self.gym.acquire_net_contact_force_tensor(self.sim)
|
||||
self.gym.refresh_dof_state_tensor(self.sim)
|
||||
self.gym.refresh_actor_root_state_tensor(self.sim)
|
||||
self.gym.refresh_net_contact_force_tensor(self.sim)
|
||||
|
||||
# create some wrapper tensors for different slices
|
||||
self.root_states = gymtorch.wrap_tensor(actor_root_state)
|
||||
self.dof_state = gymtorch.wrap_tensor(dof_state_tensor)
|
||||
self.dof_pos = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 0]
|
||||
self.dof_vel = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 1]
|
||||
self.base_quat = self.root_states[:, 3:7]
|
||||
self.base_pos = self.root_states[:self.num_envs, 0:3]
|
||||
self.contact_forces = gymtorch.wrap_tensor(net_contact_forces).view(self.num_envs, -1, 3) # shape: num_envs, num_bodies, xyz axis
|
||||
|
||||
# initialize some data used later on
|
||||
self.common_step_counter = 0
|
||||
self.extras = {}
|
||||
self.noise_scale_vec = self._get_noise_scale_vec(self.cfg)
|
||||
self.gravity_vec = to_torch(get_axis_params(-1., self.up_axis_idx), device=self.device).repeat((self.num_envs, 1))
|
||||
self.forward_vec = to_torch([1., 0., 0.], device=self.device).repeat((self.num_envs, 1))
|
||||
self.torques = torch.zeros(self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)
|
||||
self.p_gains = torch.zeros(self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)
|
||||
self.d_gains = torch.zeros(self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)
|
||||
self.actions = torch.zeros(self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)
|
||||
self.last_actions = torch.zeros(self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)
|
||||
self.last_dof_vel = torch.zeros_like(self.dof_vel)
|
||||
self.last_root_vel = torch.zeros_like(self.root_states[:, 7:13])
|
||||
self.commands = torch.zeros(self.num_envs, self.cfg.commands.num_commands, dtype=torch.float, device=self.device, requires_grad=False) # x vel, y vel, yaw vel, heading
|
||||
self.commands_scale = torch.tensor([self.obs_scales.lin_vel, self.obs_scales.lin_vel, self.obs_scales.ang_vel], device=self.device, requires_grad=False,) # TODO change this
|
||||
self.feet_air_time = torch.zeros(self.num_envs, self.feet_indices.shape[0], dtype=torch.float, device=self.device, requires_grad=False)
|
||||
self.last_contacts = torch.zeros(self.num_envs, len(self.feet_indices), dtype=torch.bool, device=self.device, requires_grad=False)
|
||||
self.base_lin_vel = quat_rotate_inverse(self.base_quat, self.root_states[:, 7:10])
|
||||
self.base_ang_vel = quat_rotate_inverse(self.base_quat, self.root_states[:, 10:13])
|
||||
self.projected_gravity = quat_rotate_inverse(self.base_quat, self.gravity_vec)
|
||||
|
||||
|
||||
# joint positions offsets and PD gains
|
||||
self.default_dof_pos = torch.zeros(self.num_dof, dtype=torch.float, device=self.device, requires_grad=False)
|
||||
for i in range(self.num_dofs):
|
||||
name = self.dof_names[i]
|
||||
angle = self.cfg.init_state.default_joint_angles[name]
|
||||
self.default_dof_pos[i] = angle
|
||||
found = False
|
||||
for dof_name in self.cfg.control.stiffness.keys():
|
||||
if dof_name in name:
|
||||
self.p_gains[i] = self.cfg.control.stiffness[dof_name]
|
||||
self.d_gains[i] = self.cfg.control.damping[dof_name]
|
||||
found = True
|
||||
if not found:
|
||||
self.p_gains[i] = 0.
|
||||
self.d_gains[i] = 0.
|
||||
if self.cfg.control.control_type in ["P", "V"]:
|
||||
print(f"PD gain of joint {name} were not defined, setting them to zero")
|
||||
self.default_dof_pos = self.default_dof_pos.unsqueeze(0)
|
||||
|
||||
def _prepare_reward_function(self):
|
||||
""" Prepares a list of reward functions, whcih will be called to compute the total reward.
|
||||
Looks for self._reward_<REWARD_NAME>, where <REWARD_NAME> are names of all non zero reward scales in the cfg.
|
||||
"""
|
||||
# remove zero scales + multiply non-zero ones by dt
|
||||
for key in list(self.reward_scales.keys()):
|
||||
scale = self.reward_scales[key]
|
||||
if scale==0:
|
||||
self.reward_scales.pop(key)
|
||||
else:
|
||||
self.reward_scales[key] *= self.dt
|
||||
# prepare list of functions
|
||||
self.reward_functions = []
|
||||
self.reward_names = []
|
||||
for name, scale in self.reward_scales.items():
|
||||
if name=="termination":
|
||||
continue
|
||||
self.reward_names.append(name)
|
||||
name = '_reward_' + name
|
||||
self.reward_functions.append(getattr(self, name))
|
||||
|
||||
# reward episode sums
|
||||
self.episode_sums = {name: torch.zeros(self.num_envs, dtype=torch.float, device=self.device, requires_grad=False)
|
||||
for name in self.reward_scales.keys()}
|
||||
|
||||
def _create_ground_plane(self):
|
||||
""" Adds a ground plane to the simulation, sets friction and restitution based on the cfg.
|
||||
"""
|
||||
plane_params = gymapi.PlaneParams()
|
||||
plane_params.normal = gymapi.Vec3(0.0, 0.0, 1.0)
|
||||
plane_params.static_friction = self.cfg.terrain.static_friction
|
||||
plane_params.dynamic_friction = self.cfg.terrain.dynamic_friction
|
||||
plane_params.restitution = self.cfg.terrain.restitution
|
||||
self.gym.add_ground(self.sim, plane_params)
|
||||
|
||||
def _create_envs(self):
|
||||
""" Creates environments:
|
||||
1. loads the robot URDF/MJCF asset,
|
||||
2. For each environment
|
||||
2.1 creates the environment,
|
||||
2.2 calls DOF and Rigid shape properties callbacks,
|
||||
2.3 create actor with these properties and add them to the env
|
||||
3. Store indices of different bodies of the robot
|
||||
"""
|
||||
asset_path = self.cfg.asset.file.format(LEGGED_GYM_ROOT_DIR=LEGGED_GYM_ROOT_DIR)
|
||||
asset_root = os.path.dirname(asset_path)
|
||||
asset_file = os.path.basename(asset_path)
|
||||
|
||||
asset_options = gymapi.AssetOptions()
|
||||
asset_options.default_dof_drive_mode = self.cfg.asset.default_dof_drive_mode
|
||||
asset_options.collapse_fixed_joints = self.cfg.asset.collapse_fixed_joints
|
||||
asset_options.replace_cylinder_with_capsule = self.cfg.asset.replace_cylinder_with_capsule
|
||||
asset_options.flip_visual_attachments = self.cfg.asset.flip_visual_attachments
|
||||
asset_options.fix_base_link = self.cfg.asset.fix_base_link
|
||||
asset_options.density = self.cfg.asset.density
|
||||
asset_options.angular_damping = self.cfg.asset.angular_damping
|
||||
asset_options.linear_damping = self.cfg.asset.linear_damping
|
||||
asset_options.max_angular_velocity = self.cfg.asset.max_angular_velocity
|
||||
asset_options.max_linear_velocity = self.cfg.asset.max_linear_velocity
|
||||
asset_options.armature = self.cfg.asset.armature
|
||||
asset_options.thickness = self.cfg.asset.thickness
|
||||
asset_options.disable_gravity = self.cfg.asset.disable_gravity
|
||||
|
||||
robot_asset = self.gym.load_asset(self.sim, asset_root, asset_file, asset_options)
|
||||
self.num_dof = self.gym.get_asset_dof_count(robot_asset)
|
||||
self.num_bodies = self.gym.get_asset_rigid_body_count(robot_asset)
|
||||
dof_props_asset = self.gym.get_asset_dof_properties(robot_asset)
|
||||
rigid_shape_props_asset = self.gym.get_asset_rigid_shape_properties(robot_asset)
|
||||
|
||||
# save body names from the asset
|
||||
body_names = self.gym.get_asset_rigid_body_names(robot_asset)
|
||||
self.dof_names = self.gym.get_asset_dof_names(robot_asset)
|
||||
self.num_bodies = len(body_names)
|
||||
self.num_dofs = len(self.dof_names)
|
||||
feet_names = [s for s in body_names if self.cfg.asset.foot_name in s]
|
||||
penalized_contact_names = []
|
||||
for name in self.cfg.asset.penalize_contacts_on:
|
||||
penalized_contact_names.extend([s for s in body_names if name in s])
|
||||
termination_contact_names = []
|
||||
for name in self.cfg.asset.terminate_after_contacts_on:
|
||||
termination_contact_names.extend([s for s in body_names if name in s])
|
||||
|
||||
base_init_state_list = self.cfg.init_state.pos + self.cfg.init_state.rot + self.cfg.init_state.lin_vel + self.cfg.init_state.ang_vel
|
||||
self.base_init_state = to_torch(base_init_state_list, device=self.device, requires_grad=False)
|
||||
start_pose = gymapi.Transform()
|
||||
start_pose.p = gymapi.Vec3(*self.base_init_state[:3])
|
||||
|
||||
self._get_env_origins()
|
||||
env_lower = gymapi.Vec3(0., 0., 0.)
|
||||
env_upper = gymapi.Vec3(0., 0., 0.)
|
||||
self.actor_handles = []
|
||||
self.envs = []
|
||||
for i in range(self.num_envs):
|
||||
# create env instance
|
||||
env_handle = self.gym.create_env(self.sim, env_lower, env_upper, int(np.sqrt(self.num_envs)))
|
||||
pos = self.env_origins[i].clone()
|
||||
pos[:2] += torch_rand_float(-1., 1., (2,1), device=self.device).squeeze(1)
|
||||
start_pose.p = gymapi.Vec3(*pos)
|
||||
|
||||
rigid_shape_props = self._process_rigid_shape_props(rigid_shape_props_asset, i)
|
||||
self.gym.set_asset_rigid_shape_properties(robot_asset, rigid_shape_props)
|
||||
actor_handle = self.gym.create_actor(env_handle, robot_asset, start_pose, self.cfg.asset.name, i, self.cfg.asset.self_collisions, 0)
|
||||
dof_props = self._process_dof_props(dof_props_asset, i)
|
||||
self.gym.set_actor_dof_properties(env_handle, actor_handle, dof_props)
|
||||
body_props = self.gym.get_actor_rigid_body_properties(env_handle, actor_handle)
|
||||
body_props = self._process_rigid_body_props(body_props, i)
|
||||
self.gym.set_actor_rigid_body_properties(env_handle, actor_handle, body_props, recomputeInertia=True)
|
||||
self.envs.append(env_handle)
|
||||
self.actor_handles.append(actor_handle)
|
||||
|
||||
self.feet_indices = torch.zeros(len(feet_names), dtype=torch.long, device=self.device, requires_grad=False)
|
||||
for i in range(len(feet_names)):
|
||||
self.feet_indices[i] = self.gym.find_actor_rigid_body_handle(self.envs[0], self.actor_handles[0], feet_names[i])
|
||||
|
||||
self.penalised_contact_indices = torch.zeros(len(penalized_contact_names), dtype=torch.long, device=self.device, requires_grad=False)
|
||||
for i in range(len(penalized_contact_names)):
|
||||
self.penalised_contact_indices[i] = self.gym.find_actor_rigid_body_handle(self.envs[0], self.actor_handles[0], penalized_contact_names[i])
|
||||
|
||||
self.termination_contact_indices = torch.zeros(len(termination_contact_names), dtype=torch.long, device=self.device, requires_grad=False)
|
||||
for i in range(len(termination_contact_names)):
|
||||
self.termination_contact_indices[i] = self.gym.find_actor_rigid_body_handle(self.envs[0], self.actor_handles[0], termination_contact_names[i])
|
||||
|
||||
def _get_env_origins(self):
|
||||
""" Sets environment origins. On rough terrain the origins are defined by the terrain platforms.
|
||||
Otherwise create a grid.
|
||||
"""
|
||||
|
||||
self.custom_origins = False
|
||||
self.env_origins = torch.zeros(self.num_envs, 3, device=self.device, requires_grad=False)
|
||||
# create a grid of robots
|
||||
num_cols = np.floor(np.sqrt(self.num_envs))
|
||||
num_rows = np.ceil(self.num_envs / num_cols)
|
||||
xx, yy = torch.meshgrid(torch.arange(num_rows), torch.arange(num_cols))
|
||||
spacing = self.cfg.env.env_spacing
|
||||
self.env_origins[:, 0] = spacing * xx.flatten()[:self.num_envs]
|
||||
self.env_origins[:, 1] = spacing * yy.flatten()[:self.num_envs]
|
||||
self.env_origins[:, 2] = 0.
|
||||
|
||||
def _parse_cfg(self, cfg):
|
||||
self.dt = self.cfg.control.decimation * self.sim_params.dt
|
||||
self.obs_scales = self.cfg.normalization.obs_scales
|
||||
self.reward_scales = class_to_dict(self.cfg.rewards.scales)
|
||||
self.command_ranges = class_to_dict(self.cfg.commands.ranges)
|
||||
|
||||
|
||||
self.max_episode_length_s = self.cfg.env.episode_length_s
|
||||
self.max_episode_length = np.ceil(self.max_episode_length_s / self.dt)
|
||||
|
||||
self.cfg.domain_rand.push_interval = np.ceil(self.cfg.domain_rand.push_interval_s / self.dt)
|
||||
|
||||
|
||||
#------------ reward functions----------------
|
||||
def _reward_lin_vel_z(self):
|
||||
# Penalize z axis base linear velocity
|
||||
return torch.square(self.base_lin_vel[:, 2])
|
||||
|
||||
def _reward_ang_vel_xy(self):
|
||||
# Penalize xy axes base angular velocity
|
||||
return torch.sum(torch.square(self.base_ang_vel[:, :2]), dim=1)
|
||||
|
||||
def _reward_orientation(self):
|
||||
# Penalize non flat base orientation
|
||||
return torch.sum(torch.square(self.projected_gravity[:, :2]), dim=1)
|
||||
|
||||
def _reward_base_height(self):
|
||||
# Penalize base height away from target
|
||||
base_height = torch.mean(self.root_states[:, 2].unsqueeze(1) - self.measured_heights, dim=1)
|
||||
return torch.square(base_height - self.cfg.rewards.base_height_target)
|
||||
|
||||
def _reward_torques(self):
|
||||
# Penalize torques
|
||||
return torch.sum(torch.square(self.torques), dim=1)
|
||||
|
||||
def _reward_dof_vel(self):
|
||||
# Penalize dof velocities
|
||||
return torch.sum(torch.square(self.dof_vel), dim=1)
|
||||
|
||||
def _reward_dof_acc(self):
|
||||
# Penalize dof accelerations
|
||||
return torch.sum(torch.square((self.last_dof_vel - self.dof_vel) / self.dt), dim=1)
|
||||
|
||||
def _reward_action_rate(self):
|
||||
# Penalize changes in actions
|
||||
return torch.sum(torch.square(self.last_actions - self.actions), dim=1)
|
||||
|
||||
def _reward_collision(self):
|
||||
# Penalize collisions on selected bodies
|
||||
return torch.sum(1.*(torch.norm(self.contact_forces[:, self.penalised_contact_indices, :], dim=-1) > 0.1), dim=1)
|
||||
|
||||
def _reward_termination(self):
|
||||
# Terminal reward / penalty
|
||||
return self.reset_buf * ~self.time_out_buf
|
||||
|
||||
def _reward_dof_pos_limits(self):
|
||||
# Penalize dof positions too close to the limit
|
||||
out_of_limits = -(self.dof_pos - self.dof_pos_limits[:, 0]).clip(max=0.) # lower limit
|
||||
out_of_limits += (self.dof_pos - self.dof_pos_limits[:, 1]).clip(min=0.)
|
||||
return torch.sum(out_of_limits, dim=1)
|
||||
|
||||
def _reward_dof_vel_limits(self):
|
||||
# Penalize dof velocities too close to the limit
|
||||
# clip to max error = 1 rad/s per joint to avoid huge penalties
|
||||
return torch.sum((torch.abs(self.dof_vel) - self.dof_vel_limits*self.cfg.rewards.soft_dof_vel_limit).clip(min=0., max=1.), dim=1)
|
||||
|
||||
def _reward_torque_limits(self):
|
||||
# penalize torques too close to the limit
|
||||
return torch.sum((torch.abs(self.torques) - self.torque_limits*self.cfg.rewards.soft_torque_limit).clip(min=0.), dim=1)
|
||||
|
||||
def _reward_tracking_lin_vel(self):
|
||||
# Tracking of linear velocity commands (xy axes)
|
||||
lin_vel_error = torch.sum(torch.square(self.commands[:, :2] - self.base_lin_vel[:, :2]), dim=1)
|
||||
return torch.exp(-lin_vel_error/self.cfg.rewards.tracking_sigma)
|
||||
|
||||
def _reward_tracking_ang_vel(self):
|
||||
# Tracking of angular velocity commands (yaw)
|
||||
ang_vel_error = torch.square(self.commands[:, 2] - self.base_ang_vel[:, 2])
|
||||
return torch.exp(-ang_vel_error/self.cfg.rewards.tracking_sigma)
|
||||
|
||||
def _reward_feet_air_time(self):
|
||||
# Reward long steps
|
||||
# Need to filter the contacts because the contact reporting of PhysX is unreliable on meshes
|
||||
contact = self.contact_forces[:, self.feet_indices, 2] > 1.
|
||||
contact_filt = torch.logical_or(contact, self.last_contacts)
|
||||
self.last_contacts = contact
|
||||
first_contact = (self.feet_air_time > 0.) * contact_filt
|
||||
self.feet_air_time += self.dt
|
||||
rew_airTime = torch.sum((self.feet_air_time - 0.5) * first_contact, dim=1) # reward only on first contact with the ground
|
||||
rew_airTime *= torch.norm(self.commands[:, :2], dim=1) > 0.1 #no reward for zero command
|
||||
self.feet_air_time *= ~contact_filt
|
||||
return rew_airTime
|
||||
|
||||
def _reward_stumble(self):
|
||||
# Penalize feet hitting vertical surfaces
|
||||
return torch.any(torch.norm(self.contact_forces[:, self.feet_indices, :2], dim=2) >\
|
||||
5 *torch.abs(self.contact_forces[:, self.feet_indices, 2]), dim=1)
|
||||
|
||||
def _reward_stand_still(self):
|
||||
# Penalize motion at zero commands
|
||||
return torch.sum(torch.abs(self.dof_pos - self.default_dof_pos), dim=1) * (torch.norm(self.commands[:, :2], dim=1) < 0.1)
|
||||
|
||||
def _reward_feet_contact_forces(self):
|
||||
# penalize high contact forces
|
||||
return torch.sum((torch.norm(self.contact_forces[:, self.feet_indices, :], dim=-1) - self.cfg.rewards.max_contact_force).clip(min=0.), dim=1)
|
|
@ -0,0 +1,214 @@
|
|||
from .base_config import BaseConfig
|
||||
|
||||
class LeggedRobotCfg(BaseConfig):
|
||||
class env:
|
||||
num_envs = 4096
|
||||
num_observations = 48
|
||||
num_privileged_obs = None # if not None a priviledge_obs_buf will be returned by step() (critic obs for assymetric training). None is returned otherwise
|
||||
num_actions = 12
|
||||
env_spacing = 3. # not used with heightfields/trimeshes
|
||||
send_timeouts = True # send time out information to the algorithm
|
||||
episode_length_s = 20 # episode length in seconds
|
||||
|
||||
class terrain:
|
||||
mesh_type = 'plane' # "heightfield" # none, plane, heightfield or trimesh
|
||||
horizontal_scale = 0.1 # [m]
|
||||
vertical_scale = 0.005 # [m]
|
||||
border_size = 25 # [m]
|
||||
curriculum = True
|
||||
static_friction = 1.0
|
||||
dynamic_friction = 1.0
|
||||
restitution = 0.
|
||||
# rough terrain only:
|
||||
measure_heights = True
|
||||
measured_points_x = [-0.8, -0.7, -0.6, -0.5, -0.4, -0.3, -0.2, -0.1, 0., 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8] # 1mx1.6m rectangle (without center line)
|
||||
measured_points_y = [-0.5, -0.4, -0.3, -0.2, -0.1, 0., 0.1, 0.2, 0.3, 0.4, 0.5]
|
||||
selected = False # select a unique terrain type and pass all arguments
|
||||
terrain_kwargs = None # Dict of arguments for selected terrain
|
||||
max_init_terrain_level = 5 # starting curriculum state
|
||||
terrain_length = 8.
|
||||
terrain_width = 8.
|
||||
num_rows= 10 # number of terrain rows (levels)
|
||||
num_cols = 20 # number of terrain cols (types)
|
||||
# terrain types: [smooth slope, rough slope, stairs up, stairs down, discrete]
|
||||
terrain_proportions = [0.1, 0.1, 0.35, 0.25, 0.2]
|
||||
# trimesh only:
|
||||
slope_treshold = 0.75 # slopes above this threshold will be corrected to vertical surfaces
|
||||
|
||||
class commands:
|
||||
curriculum = False
|
||||
max_curriculum = 1.
|
||||
num_commands = 4 # default: lin_vel_x, lin_vel_y, ang_vel_yaw, heading (in heading mode ang_vel_yaw is recomputed from heading error)
|
||||
resampling_time = 10. # time before command are changed[s]
|
||||
heading_command = True # if true: compute ang vel command from heading error
|
||||
class ranges:
|
||||
lin_vel_x = [-1.0, 1.0] # min max [m/s]
|
||||
lin_vel_y = [-1.0, 1.0] # min max [m/s]
|
||||
ang_vel_yaw = [-1, 1] # min max [rad/s]
|
||||
heading = [-3.14, 3.14]
|
||||
|
||||
class init_state:
|
||||
pos = [0.0, 0.0, 1.] # x,y,z [m]
|
||||
rot = [0.0, 0.0, 0.0, 1.0] # x,y,z,w [quat]
|
||||
lin_vel = [0.0, 0.0, 0.0] # x,y,z [m/s]
|
||||
ang_vel = [0.0, 0.0, 0.0] # x,y,z [rad/s]
|
||||
default_joint_angles = { # target angles when action = 0.0
|
||||
"joint_a": 0.,
|
||||
"joint_b": 0.}
|
||||
|
||||
class control:
|
||||
control_type = 'P' # P: position, V: velocity, T: torques
|
||||
# PD Drive parameters:
|
||||
stiffness = {'joint_a': 10.0, 'joint_b': 15.} # [N*m/rad]
|
||||
damping = {'joint_a': 1.0, 'joint_b': 1.5} # [N*m*s/rad]
|
||||
# action scale: target angle = actionScale * action + defaultAngle
|
||||
action_scale = 0.5
|
||||
# decimation: Number of control action updates @ sim DT per policy DT
|
||||
decimation = 4
|
||||
|
||||
class asset:
|
||||
file = ""
|
||||
name = "legged_robot" # actor name
|
||||
foot_name = "None" # name of the feet bodies, used to index body state and contact force tensors
|
||||
penalize_contacts_on = []
|
||||
terminate_after_contacts_on = []
|
||||
disable_gravity = False
|
||||
collapse_fixed_joints = True # merge bodies connected by fixed joints. Specific fixed joints can be kept by adding " <... dont_collapse="true">
|
||||
fix_base_link = False # fixe the base of the robot
|
||||
default_dof_drive_mode = 3 # see GymDofDriveModeFlags (0 is none, 1 is pos tgt, 2 is vel tgt, 3 effort)
|
||||
self_collisions = 0 # 1 to disable, 0 to enable...bitwise filter
|
||||
replace_cylinder_with_capsule = True # replace collision cylinders with capsules, leads to faster/more stable simulation
|
||||
flip_visual_attachments = True # Some .obj meshes must be flipped from y-up to z-up
|
||||
|
||||
density = 0.001
|
||||
angular_damping = 0.
|
||||
linear_damping = 0.
|
||||
max_angular_velocity = 1000.
|
||||
max_linear_velocity = 1000.
|
||||
armature = 0.
|
||||
thickness = 0.01
|
||||
|
||||
class domain_rand:
|
||||
randomize_friction = True
|
||||
friction_range = [0.5, 1.25]
|
||||
randomize_base_mass = False
|
||||
added_mass_range = [-1., 1.]
|
||||
push_robots = True
|
||||
push_interval_s = 15
|
||||
max_push_vel_xy = 1.
|
||||
|
||||
class rewards:
|
||||
class scales:
|
||||
termination = -0.0
|
||||
tracking_lin_vel = 1.0
|
||||
tracking_ang_vel = 0.5
|
||||
lin_vel_z = -2.0
|
||||
ang_vel_xy = -0.05
|
||||
orientation = -0.
|
||||
torques = -0.00001
|
||||
dof_vel = -0.
|
||||
dof_acc = -2.5e-7
|
||||
base_height = -0.
|
||||
feet_air_time = 1.0
|
||||
collision = -1.
|
||||
feet_stumble = -0.0
|
||||
action_rate = -0.01
|
||||
stand_still = -0.
|
||||
|
||||
only_positive_rewards = True # if true negative total rewards are clipped at zero (avoids early termination problems)
|
||||
tracking_sigma = 0.25 # tracking reward = exp(-error^2/sigma)
|
||||
soft_dof_pos_limit = 1. # percentage of urdf limits, values above this limit are penalized
|
||||
soft_dof_vel_limit = 1.
|
||||
soft_torque_limit = 1.
|
||||
base_height_target = 1.
|
||||
max_contact_force = 100. # forces above this value are penalized
|
||||
|
||||
class normalization:
|
||||
class obs_scales:
|
||||
lin_vel = 2.0
|
||||
ang_vel = 0.25
|
||||
dof_pos = 1.0
|
||||
dof_vel = 0.05
|
||||
height_measurements = 5.0
|
||||
clip_observations = 100.
|
||||
clip_actions = 100.
|
||||
|
||||
class noise:
|
||||
add_noise = True
|
||||
noise_level = 1.0 # scales other values
|
||||
class noise_scales:
|
||||
dof_pos = 0.01
|
||||
dof_vel = 1.5
|
||||
lin_vel = 0.1
|
||||
ang_vel = 0.2
|
||||
gravity = 0.05
|
||||
height_measurements = 0.1
|
||||
|
||||
# viewer camera:
|
||||
class viewer:
|
||||
ref_env = 0
|
||||
pos = [10, 0, 6] # [m]
|
||||
lookat = [11., 5, 3.] # [m]
|
||||
|
||||
class sim:
|
||||
dt = 0.005
|
||||
substeps = 1
|
||||
gravity = [0., 0. ,-9.81] # [m/s^2]
|
||||
up_axis = 1 # 0 is y, 1 is z
|
||||
|
||||
class physx:
|
||||
num_threads = 10
|
||||
solver_type = 1 # 0: pgs, 1: tgs
|
||||
num_position_iterations = 4
|
||||
num_velocity_iterations = 0
|
||||
contact_offset = 0.01 # [m]
|
||||
rest_offset = 0.0 # [m]
|
||||
bounce_threshold_velocity = 0.5 #0.5 [m/s]
|
||||
max_depenetration_velocity = 1.0
|
||||
max_gpu_contact_pairs = 2**23 #2**24 -> needed for 8000 envs and more
|
||||
default_buffer_size_multiplier = 5
|
||||
contact_collection = 2 # 0: never, 1: last sub-step, 2: all sub-steps (default=2)
|
||||
|
||||
class LeggedRobotCfgPPO(BaseConfig):
|
||||
seed = 1
|
||||
runner_class_name = 'OnPolicyRunner'
|
||||
class policy:
|
||||
init_noise_std = 1.0
|
||||
actor_hidden_dims = [512, 256, 128]
|
||||
critic_hidden_dims = [512, 256, 128]
|
||||
activation = 'elu' # can be elu, relu, selu, crelu, lrelu, tanh, sigmoid
|
||||
# only for 'ActorCriticRecurrent':
|
||||
# rnn_type = 'lstm'
|
||||
# rnn_hidden_size = 512
|
||||
# rnn_num_layers = 1
|
||||
|
||||
class algorithm:
|
||||
# training params
|
||||
value_loss_coef = 1.0
|
||||
use_clipped_value_loss = True
|
||||
clip_param = 0.2
|
||||
entropy_coef = 0.01
|
||||
num_learning_epochs = 5
|
||||
num_mini_batches = 4 # mini batch size = num_envs*nsteps / nminibatches
|
||||
learning_rate = 1.e-3 #5.e-4
|
||||
schedule = 'adaptive' # could be adaptive, fixed
|
||||
gamma = 0.99
|
||||
lam = 0.95
|
||||
desired_kl = 0.01
|
||||
max_grad_norm = 1.
|
||||
|
||||
class runner:
|
||||
policy_class_name = 'ActorCritic'
|
||||
algorithm_class_name = 'PPO'
|
||||
num_steps_per_env = 24 # per iteration
|
||||
max_iterations = 1500 # number of policy updates
|
||||
|
||||
# logging
|
||||
save_interval = 50 # check for potential saves every this many iterations
|
||||
experiment_name = 'test'
|
||||
run_name = ''
|
||||
# load and resume
|
||||
resume = False
|
||||
load_run = -1 # -1 = last run
|
||||
checkpoint = -1 # -1 = last saved model
|
||||
resume_path = None # updated from load_run and chkpt
|
|
@ -0,0 +1,85 @@
|
|||
# SPDX-FileCopyrightText: Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
|
||||
# SPDX-License-Identifier: BSD-3-Clause
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without
|
||||
# modification, are permitted provided that the following conditions are met:
|
||||
#
|
||||
# 1. Redistributions of source code must retain the above copyright notice, this
|
||||
# list of conditions and the following disclaimer.
|
||||
#
|
||||
# 2. Redistributions in binary form must reproduce the above copyright notice,
|
||||
# this list of conditions and the following disclaimer in the documentation
|
||||
# and/or other materials provided with the distribution.
|
||||
#
|
||||
# 3. Neither the name of the copyright holder nor the names of its
|
||||
# contributors may be used to endorse or promote products derived from
|
||||
# this software without specific prior written permission.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
||||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
||||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
|
||||
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
||||
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
|
||||
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
||||
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
|
||||
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
#
|
||||
# Copyright (c) 2021 ETH Zurich, Nikita Rudin
|
||||
|
||||
from legged_gym.envs.base.legged_robot_config import LeggedRobotCfg, LeggedRobotCfgPPO
|
||||
|
||||
class GO2RoughCfg( LeggedRobotCfg ):
|
||||
class init_state( LeggedRobotCfg.init_state ):
|
||||
pos = [0.0, 0.0, 0.42] # x,y,z [m]
|
||||
default_joint_angles = { # = target angles [rad] when action = 0.0
|
||||
'FL_hip_joint': 0.1, # [rad]
|
||||
'RL_hip_joint': 0.1, # [rad]
|
||||
'FR_hip_joint': -0.1 , # [rad]
|
||||
'RR_hip_joint': -0.1, # [rad]
|
||||
|
||||
'FL_thigh_joint': 0.8, # [rad]
|
||||
'RL_thigh_joint': 1., # [rad]
|
||||
'FR_thigh_joint': 0.8, # [rad]
|
||||
'RR_thigh_joint': 1., # [rad]
|
||||
|
||||
'FL_calf_joint': -1.5, # [rad]
|
||||
'RL_calf_joint': -1.5, # [rad]
|
||||
'FR_calf_joint': -1.5, # [rad]
|
||||
'RR_calf_joint': -1.5, # [rad]
|
||||
}
|
||||
|
||||
class control( LeggedRobotCfg.control ):
|
||||
# PD Drive parameters:
|
||||
control_type = 'P'
|
||||
stiffness = {'joint': 20.} # [N*m/rad]
|
||||
damping = {'joint': 0.5} # [N*m*s/rad]
|
||||
# action scale: target angle = actionScale * action + defaultAngle
|
||||
action_scale = 0.25
|
||||
# decimation: Number of control action updates @ sim DT per policy DT
|
||||
decimation = 4
|
||||
|
||||
class asset( LeggedRobotCfg.asset ):
|
||||
file = '{LEGGED_GYM_ROOT_DIR}/resources/robots/go2/urdf/go2.urdf'
|
||||
name = "go2"
|
||||
foot_name = "foot"
|
||||
penalize_contacts_on = ["thigh", "calf"]
|
||||
terminate_after_contacts_on = ["base"]
|
||||
self_collisions = 1 # 1 to disable, 0 to enable...bitwise filter
|
||||
|
||||
class rewards( LeggedRobotCfg.rewards ):
|
||||
soft_dof_pos_limit = 0.9
|
||||
base_height_target = 0.25
|
||||
class scales( LeggedRobotCfg.rewards.scales ):
|
||||
torques = -0.0002
|
||||
dof_pos_limits = -10.0
|
||||
|
||||
class GO2RoughCfgPPO( LeggedRobotCfgPPO ):
|
||||
class algorithm( LeggedRobotCfgPPO.algorithm ):
|
||||
entropy_coef = 0.01
|
||||
class runner( LeggedRobotCfgPPO.runner ):
|
||||
run_name = ''
|
||||
experiment_name = 'rough_go2'
|
||||
|
||||
|
|
@ -0,0 +1,48 @@
|
|||
import sys
|
||||
sys.path.append("/home/unitree/unitree_rl_gym")
|
||||
from legged_gym import LEGGED_GYM_ROOT_DIR
|
||||
import os
|
||||
|
||||
import isaacgym
|
||||
from legged_gym.envs import *
|
||||
from legged_gym.utils import get_args, export_policy_as_jit, task_registry, Logger
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
|
||||
|
||||
def play(args):
|
||||
env_cfg, train_cfg = task_registry.get_cfgs(name=args.task)
|
||||
# override some parameters for testing
|
||||
env_cfg.env.num_envs = min(env_cfg.env.num_envs, 1)
|
||||
env_cfg.terrain.num_rows = 5
|
||||
env_cfg.terrain.num_cols = 5
|
||||
env_cfg.terrain.curriculum = False
|
||||
env_cfg.noise.add_noise = False
|
||||
env_cfg.domain_rand.randomize_friction = False
|
||||
env_cfg.domain_rand.push_robots = False
|
||||
|
||||
# prepare environment
|
||||
env, _ = task_registry.make_env(name=args.task, args=args, env_cfg=env_cfg)
|
||||
obs = env.get_observations()
|
||||
# load policy
|
||||
train_cfg.runner.resume = True
|
||||
ppo_runner, train_cfg = task_registry.make_alg_runner(env=env, name=args.task, args=args, train_cfg=train_cfg)
|
||||
policy = ppo_runner.get_inference_policy(device=env.device)
|
||||
|
||||
# export policy as a jit module (used to run it from C++)
|
||||
if EXPORT_POLICY:
|
||||
path = os.path.join(LEGGED_GYM_ROOT_DIR, 'logs', train_cfg.runner.experiment_name, 'exported', 'policies')
|
||||
export_policy_as_jit(ppo_runner.alg.actor_critic, path)
|
||||
print('Exported policy as jit script to: ', path)
|
||||
|
||||
for i in range(10*int(env.max_episode_length)):
|
||||
actions = policy(obs.detach())
|
||||
obs, _, rews, dones, infos = env.step(actions.detach())
|
||||
|
||||
if __name__ == '__main__':
|
||||
EXPORT_POLICY = True
|
||||
RECORD_FRAMES = False
|
||||
MOVE_CAMERA = False
|
||||
args = get_args()
|
||||
play(args)
|
|
@ -0,0 +1,20 @@
|
|||
import numpy as np
|
||||
import os
|
||||
from datetime import datetime
|
||||
import sys
|
||||
sys.path.append("/home/unitree/unitree_rl_gym/")
|
||||
|
||||
import isaacgym
|
||||
from legged_gym.envs import *
|
||||
from legged_gym.utils import get_args, task_registry
|
||||
import torch
|
||||
|
||||
def train(args):
|
||||
env, env_cfg = task_registry.make_env(name=args.task, args=args)
|
||||
ppo_runner, train_cfg = task_registry.make_alg_runner(env=env, name=args.task, args=args)
|
||||
ppo_runner.learn(num_learning_iterations=train_cfg.runner.max_iterations, init_at_random_ep_len=True)
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = get_args()
|
||||
args.headless = False
|
||||
train(args)
|
|
@ -0,0 +1,5 @@
|
|||
from .helpers import class_to_dict, get_load_path, get_args, export_policy_as_jit, set_seed, update_class_from_dict
|
||||
from .task_registry import task_registry
|
||||
from .logger import Logger
|
||||
from .math import *
|
||||
from .terrain import Terrain
|
|
@ -0,0 +1,191 @@
|
|||
import os
|
||||
import copy
|
||||
import torch
|
||||
import numpy as np
|
||||
import random
|
||||
from isaacgym import gymapi
|
||||
from isaacgym import gymutil
|
||||
|
||||
from legged_gym import LEGGED_GYM_ROOT_DIR, LEGGED_GYM_ENVS_DIR
|
||||
|
||||
def class_to_dict(obj) -> dict:
|
||||
if not hasattr(obj,"__dict__"):
|
||||
return obj
|
||||
result = {}
|
||||
for key in dir(obj):
|
||||
if key.startswith("_"):
|
||||
continue
|
||||
element = []
|
||||
val = getattr(obj, key)
|
||||
if isinstance(val, list):
|
||||
for item in val:
|
||||
element.append(class_to_dict(item))
|
||||
else:
|
||||
element = class_to_dict(val)
|
||||
result[key] = element
|
||||
return result
|
||||
|
||||
def update_class_from_dict(obj, dict):
|
||||
for key, val in dict.items():
|
||||
attr = getattr(obj, key, None)
|
||||
if isinstance(attr, type):
|
||||
update_class_from_dict(attr, val)
|
||||
else:
|
||||
setattr(obj, key, val)
|
||||
return
|
||||
|
||||
def set_seed(seed):
|
||||
if seed == -1:
|
||||
seed = np.random.randint(0, 10000)
|
||||
print("Setting seed: {}".format(seed))
|
||||
|
||||
random.seed(seed)
|
||||
np.random.seed(seed)
|
||||
torch.manual_seed(seed)
|
||||
os.environ['PYTHONHASHSEED'] = str(seed)
|
||||
torch.cuda.manual_seed(seed)
|
||||
torch.cuda.manual_seed_all(seed)
|
||||
|
||||
def parse_sim_params(args, cfg):
|
||||
# code from Isaac Gym Preview 2
|
||||
# initialize sim params
|
||||
sim_params = gymapi.SimParams()
|
||||
|
||||
# set some values from args
|
||||
if args.physics_engine == gymapi.SIM_FLEX:
|
||||
if args.device != "cpu":
|
||||
print("WARNING: Using Flex with GPU instead of PHYSX!")
|
||||
elif args.physics_engine == gymapi.SIM_PHYSX:
|
||||
sim_params.physx.use_gpu = args.use_gpu
|
||||
sim_params.physx.num_subscenes = args.subscenes
|
||||
sim_params.use_gpu_pipeline = args.use_gpu_pipeline
|
||||
|
||||
# if sim options are provided in cfg, parse them and update/override above:
|
||||
if "sim" in cfg:
|
||||
gymutil.parse_sim_config(cfg["sim"], sim_params)
|
||||
|
||||
# Override num_threads if passed on the command line
|
||||
if args.physics_engine == gymapi.SIM_PHYSX and args.num_threads > 0:
|
||||
sim_params.physx.num_threads = args.num_threads
|
||||
|
||||
return sim_params
|
||||
|
||||
def get_load_path(root, load_run=-1, checkpoint=-1):
|
||||
try:
|
||||
runs = os.listdir(root)
|
||||
#TODO sort by date to handle change of month
|
||||
runs.sort()
|
||||
if 'exported' in runs: runs.remove('exported')
|
||||
last_run = os.path.join(root, runs[-1])
|
||||
except:
|
||||
raise ValueError("No runs in this directory: " + root)
|
||||
if load_run==-1:
|
||||
load_run = last_run
|
||||
else:
|
||||
load_run = os.path.join(root, load_run)
|
||||
|
||||
if checkpoint==-1:
|
||||
models = [file for file in os.listdir(load_run) if 'model' in file]
|
||||
models.sort(key=lambda m: '{0:0>15}'.format(m))
|
||||
model = models[-1]
|
||||
else:
|
||||
model = "model_{}.pt".format(checkpoint)
|
||||
|
||||
load_path = os.path.join(load_run, model)
|
||||
return load_path
|
||||
|
||||
def update_cfg_from_args(env_cfg, cfg_train, args):
|
||||
# seed
|
||||
if env_cfg is not None:
|
||||
# num envs
|
||||
if args.num_envs is not None:
|
||||
env_cfg.env.num_envs = args.num_envs
|
||||
if cfg_train is not None:
|
||||
if args.seed is not None:
|
||||
cfg_train.seed = args.seed
|
||||
# alg runner parameters
|
||||
if args.max_iterations is not None:
|
||||
cfg_train.runner.max_iterations = args.max_iterations
|
||||
if args.resume:
|
||||
cfg_train.runner.resume = args.resume
|
||||
if args.experiment_name is not None:
|
||||
cfg_train.runner.experiment_name = args.experiment_name
|
||||
if args.run_name is not None:
|
||||
cfg_train.runner.run_name = args.run_name
|
||||
if args.load_run is not None:
|
||||
cfg_train.runner.load_run = args.load_run
|
||||
if args.checkpoint is not None:
|
||||
cfg_train.runner.checkpoint = args.checkpoint
|
||||
|
||||
return env_cfg, cfg_train
|
||||
|
||||
def get_args():
|
||||
custom_parameters = [
|
||||
{"name": "--task", "type": str, "default": "go2", "help": "Resume training or start testing from a checkpoint. Overrides config file if provided."},
|
||||
{"name": "--resume", "action": "store_true", "default": False, "help": "Resume training from a checkpoint"},
|
||||
{"name": "--experiment_name", "type": str, "help": "Name of the experiment to run or load. Overrides config file if provided."},
|
||||
{"name": "--run_name", "type": str, "help": "Name of the run. Overrides config file if provided."},
|
||||
{"name": "--load_run", "type": str, "help": "Name of the run to load when resume=True. If -1: will load the last run. Overrides config file if provided."},
|
||||
{"name": "--checkpoint", "type": int, "help": "Saved model checkpoint number. If -1: will load the last checkpoint. Overrides config file if provided."},
|
||||
|
||||
{"name": "--headless", "action": "store_true", "default": False, "help": "Force display off at all times"},
|
||||
{"name": "--horovod", "action": "store_true", "default": False, "help": "Use horovod for multi-gpu training"},
|
||||
{"name": "--rl_device", "type": str, "default": "cuda:0", "help": 'Device used by the RL algorithm, (cpu, gpu, cuda:0, cuda:1 etc..)'},
|
||||
{"name": "--num_envs", "type": int, "help": "Number of environments to create. Overrides config file if provided."},
|
||||
{"name": "--seed", "type": int, "help": "Random seed. Overrides config file if provided."},
|
||||
{"name": "--max_iterations", "type": int, "help": "Maximum number of training iterations. Overrides config file if provided."},
|
||||
]
|
||||
# parse arguments
|
||||
args = gymutil.parse_arguments(
|
||||
description="RL Policy",
|
||||
custom_parameters=custom_parameters)
|
||||
|
||||
# name allignment
|
||||
args.sim_device_id = args.compute_device_id
|
||||
args.sim_device = args.sim_device_type
|
||||
if args.sim_device=='cuda':
|
||||
args.sim_device += f":{args.sim_device_id}"
|
||||
return args
|
||||
|
||||
def export_policy_as_jit(actor_critic, path):
|
||||
if hasattr(actor_critic, 'memory_a'):
|
||||
# assumes LSTM: TODO add GRU
|
||||
exporter = PolicyExporterLSTM(actor_critic)
|
||||
exporter.export(path)
|
||||
else:
|
||||
os.makedirs(path, exist_ok=True)
|
||||
path = os.path.join(path, 'policy_1.pt')
|
||||
model = copy.deepcopy(actor_critic.actor).to('cpu')
|
||||
traced_script_module = torch.jit.script(model)
|
||||
traced_script_module.save(path)
|
||||
|
||||
|
||||
class PolicyExporterLSTM(torch.nn.Module):
|
||||
def __init__(self, actor_critic):
|
||||
super().__init__()
|
||||
self.actor = copy.deepcopy(actor_critic.actor)
|
||||
self.is_recurrent = actor_critic.is_recurrent
|
||||
self.memory = copy.deepcopy(actor_critic.memory_a.rnn)
|
||||
self.memory.cpu()
|
||||
self.register_buffer(f'hidden_state', torch.zeros(self.memory.num_layers, 1, self.memory.hidden_size))
|
||||
self.register_buffer(f'cell_state', torch.zeros(self.memory.num_layers, 1, self.memory.hidden_size))
|
||||
|
||||
def forward(self, x):
|
||||
out, (h, c) = self.memory(x.unsqueeze(0), (self.hidden_state, self.cell_state))
|
||||
self.hidden_state[:] = h
|
||||
self.cell_state[:] = c
|
||||
return self.actor(out.squeeze(0))
|
||||
|
||||
@torch.jit.export
|
||||
def reset_memory(self):
|
||||
self.hidden_state[:] = 0.
|
||||
self.cell_state[:] = 0.
|
||||
|
||||
def export(self, path):
|
||||
os.makedirs(path, exist_ok=True)
|
||||
path = os.path.join(path, 'policy_lstm_1.pt')
|
||||
self.to('cpu')
|
||||
traced_script_module = torch.jit.script(self)
|
||||
traced_script_module.save(path)
|
||||
|
||||
|
|
@ -0,0 +1,39 @@
|
|||
import numpy as np
|
||||
from collections import defaultdict
|
||||
from multiprocessing import Process, Value
|
||||
|
||||
class Logger:
|
||||
def __init__(self, dt):
|
||||
self.state_log = defaultdict(list)
|
||||
self.rew_log = defaultdict(list)
|
||||
self.dt = dt
|
||||
self.num_episodes = 0
|
||||
self.plot_process = None
|
||||
|
||||
def log_state(self, key, value):
|
||||
self.state_log[key].append(value)
|
||||
|
||||
def log_states(self, dict):
|
||||
for key, value in dict.items():
|
||||
self.log_state(key, value)
|
||||
|
||||
def log_rewards(self, dict, num_episodes):
|
||||
for key, value in dict.items():
|
||||
if 'rew' in key:
|
||||
self.rew_log[key].append(value.item() * num_episodes)
|
||||
self.num_episodes += num_episodes
|
||||
|
||||
def reset(self):
|
||||
self.state_log.clear()
|
||||
self.rew_log.clear()
|
||||
|
||||
def print_rewards(self):
|
||||
print("Average rewards per second:")
|
||||
for key, values in self.rew_log.items():
|
||||
mean = np.sum(np.array(values)) / self.num_episodes
|
||||
print(f" - {key}: {mean}")
|
||||
print(f"Total number of episodes: {self.num_episodes}")
|
||||
|
||||
def __del__(self):
|
||||
if self.plot_process is not None:
|
||||
self.plot_process.kill()
|
|
@ -0,0 +1,26 @@
|
|||
import torch
|
||||
from torch import Tensor
|
||||
import numpy as np
|
||||
from isaacgym.torch_utils import quat_apply, normalize
|
||||
from typing import Tuple
|
||||
|
||||
# @ torch.jit.script
|
||||
def quat_apply_yaw(quat, vec):
|
||||
quat_yaw = quat.clone().view(-1, 4)
|
||||
quat_yaw[:, :2] = 0.
|
||||
quat_yaw = normalize(quat_yaw)
|
||||
return quat_apply(quat_yaw, vec)
|
||||
|
||||
# @ torch.jit.script
|
||||
def wrap_to_pi(angles):
|
||||
angles %= 2*np.pi
|
||||
angles -= 2*np.pi * (angles > np.pi)
|
||||
return angles
|
||||
|
||||
# @ torch.jit.script
|
||||
def torch_rand_sqrt_float(lower, upper, shape, device):
|
||||
# type: (float, float, Tuple[int, int], str) -> Tensor
|
||||
r = 2*torch.rand(*shape, device=device) - 1
|
||||
r = torch.where(r<0., -torch.sqrt(-r), torch.sqrt(r))
|
||||
r = (r + 1.) / 2.
|
||||
return (upper - lower) * r + lower
|
|
@ -0,0 +1,130 @@
|
|||
import os
|
||||
from datetime import datetime
|
||||
from typing import Tuple
|
||||
import torch
|
||||
import numpy as np
|
||||
import sys
|
||||
sys.path.append("/home/unitree/unitree_rl_gym/rsl_rl")
|
||||
|
||||
from rsl_rl.env import VecEnv
|
||||
from rsl_rl.runners import OnPolicyRunner
|
||||
|
||||
from legged_gym import LEGGED_GYM_ROOT_DIR, LEGGED_GYM_ENVS_DIR
|
||||
from .helpers import get_args, update_cfg_from_args, class_to_dict, get_load_path, set_seed, parse_sim_params
|
||||
from legged_gym.envs.base.legged_robot_config import LeggedRobotCfg, LeggedRobotCfgPPO
|
||||
|
||||
class TaskRegistry():
|
||||
def __init__(self):
|
||||
self.task_classes = {}
|
||||
self.env_cfgs = {}
|
||||
self.train_cfgs = {}
|
||||
|
||||
def register(self, name: str, task_class: VecEnv, env_cfg: LeggedRobotCfg, train_cfg: LeggedRobotCfgPPO):
|
||||
self.task_classes[name] = task_class
|
||||
self.env_cfgs[name] = env_cfg
|
||||
self.train_cfgs[name] = train_cfg
|
||||
|
||||
def get_task_class(self, name: str) -> VecEnv:
|
||||
return self.task_classes[name]
|
||||
|
||||
def get_cfgs(self, name) -> Tuple[LeggedRobotCfg, LeggedRobotCfgPPO]:
|
||||
train_cfg = self.train_cfgs[name]
|
||||
env_cfg = self.env_cfgs[name]
|
||||
# copy seed
|
||||
env_cfg.seed = train_cfg.seed
|
||||
return env_cfg, train_cfg
|
||||
|
||||
def make_env(self, name, args=None, env_cfg=None) -> Tuple[VecEnv, LeggedRobotCfg]:
|
||||
""" Creates an environment either from a registered namme or from the provided config file.
|
||||
|
||||
Args:
|
||||
name (string): Name of a registered env.
|
||||
args (Args, optional): Isaac Gym comand line arguments. If None get_args() will be called. Defaults to None.
|
||||
env_cfg (Dict, optional): Environment config file used to override the registered config. Defaults to None.
|
||||
|
||||
Raises:
|
||||
ValueError: Error if no registered env corresponds to 'name'
|
||||
|
||||
Returns:
|
||||
isaacgym.VecTaskPython: The created environment
|
||||
Dict: the corresponding config file
|
||||
"""
|
||||
# if no args passed get command line arguments
|
||||
if args is None:
|
||||
args = get_args()
|
||||
# check if there is a registered env with that name
|
||||
if name in self.task_classes:
|
||||
task_class = self.get_task_class(name)
|
||||
else:
|
||||
raise ValueError(f"Task with name: {name} was not registered")
|
||||
if env_cfg is None:
|
||||
# load config files
|
||||
env_cfg, _ = self.get_cfgs(name)
|
||||
# override cfg from args (if specified)
|
||||
env_cfg, _ = update_cfg_from_args(env_cfg, None, args)
|
||||
set_seed(env_cfg.seed)
|
||||
# parse sim params (convert to dict first)
|
||||
sim_params = {"sim": class_to_dict(env_cfg.sim)}
|
||||
sim_params = parse_sim_params(args, sim_params)
|
||||
env = task_class( cfg=env_cfg,
|
||||
sim_params=sim_params,
|
||||
physics_engine=args.physics_engine,
|
||||
sim_device=args.sim_device,
|
||||
headless=args.headless)
|
||||
return env, env_cfg
|
||||
|
||||
def make_alg_runner(self, env, name=None, args=None, train_cfg=None, log_root="default") -> Tuple[OnPolicyRunner, LeggedRobotCfgPPO]:
|
||||
""" Creates the training algorithm either from a registered namme or from the provided config file.
|
||||
|
||||
Args:
|
||||
env (isaacgym.VecTaskPython): The environment to train (TODO: remove from within the algorithm)
|
||||
name (string, optional): Name of a registered env. If None, the config file will be used instead. Defaults to None.
|
||||
args (Args, optional): Isaac Gym comand line arguments. If None get_args() will be called. Defaults to None.
|
||||
train_cfg (Dict, optional): Training config file. If None 'name' will be used to get the config file. Defaults to None.
|
||||
log_root (str, optional): Logging directory for Tensorboard. Set to 'None' to avoid logging (at test time for example).
|
||||
Logs will be saved in <log_root>/<date_time>_<run_name>. Defaults to "default"=<path_to_LEGGED_GYM>/logs/<experiment_name>.
|
||||
|
||||
Raises:
|
||||
ValueError: Error if neither 'name' or 'train_cfg' are provided
|
||||
Warning: If both 'name' or 'train_cfg' are provided 'name' is ignored
|
||||
|
||||
Returns:
|
||||
PPO: The created algorithm
|
||||
Dict: the corresponding config file
|
||||
"""
|
||||
# if no args passed get command line arguments
|
||||
if args is None:
|
||||
args = get_args()
|
||||
# if config files are passed use them, otherwise load from the name
|
||||
if train_cfg is None:
|
||||
if name is None:
|
||||
raise ValueError("Either 'name' or 'train_cfg' must be not None")
|
||||
# load config files
|
||||
_, train_cfg = self.get_cfgs(name)
|
||||
else:
|
||||
if name is not None:
|
||||
print(f"'train_cfg' provided -> Ignoring 'name={name}'")
|
||||
# override cfg from args (if specified)
|
||||
_, train_cfg = update_cfg_from_args(None, train_cfg, args)
|
||||
|
||||
if log_root=="default":
|
||||
log_root = os.path.join(LEGGED_GYM_ROOT_DIR, 'logs', train_cfg.runner.experiment_name)
|
||||
log_dir = os.path.join(log_root, datetime.now().strftime('%b%d_%H-%M-%S') + '_' + train_cfg.runner.run_name)
|
||||
elif log_root is None:
|
||||
log_dir = None
|
||||
else:
|
||||
log_dir = os.path.join(log_root, datetime.now().strftime('%b%d_%H-%M-%S') + '_' + train_cfg.runner.run_name)
|
||||
|
||||
train_cfg_dict = class_to_dict(train_cfg)
|
||||
runner = OnPolicyRunner(env, train_cfg_dict, log_dir, device=args.rl_device)
|
||||
#save resume path before creating a new log_dir
|
||||
resume = train_cfg.runner.resume
|
||||
if resume:
|
||||
# load previously trained model
|
||||
resume_path = get_load_path(log_root, load_run=train_cfg.runner.load_run, checkpoint=train_cfg.runner.checkpoint)
|
||||
print(f"Loading model from: {resume_path}")
|
||||
runner.load(resume_path)
|
||||
return runner, train_cfg
|
||||
|
||||
# make global task registry
|
||||
task_registry = TaskRegistry()
|
|
@ -0,0 +1,157 @@
|
|||
import numpy as np
|
||||
from numpy.random import choice
|
||||
from scipy import interpolate
|
||||
|
||||
from isaacgym import terrain_utils
|
||||
from legged_gym.envs.base.legged_robot_config import LeggedRobotCfg
|
||||
|
||||
class Terrain:
|
||||
def __init__(self, cfg: LeggedRobotCfg.terrain, num_robots) -> None:
|
||||
|
||||
self.cfg = cfg
|
||||
self.num_robots = num_robots
|
||||
self.type = cfg.mesh_type
|
||||
if self.type in ["none", 'plane']:
|
||||
return
|
||||
self.env_length = cfg.terrain_length
|
||||
self.env_width = cfg.terrain_width
|
||||
self.proportions = [np.sum(cfg.terrain_proportions[:i+1]) for i in range(len(cfg.terrain_proportions))]
|
||||
|
||||
self.cfg.num_sub_terrains = cfg.num_rows * cfg.num_cols
|
||||
self.env_origins = np.zeros((cfg.num_rows, cfg.num_cols, 3))
|
||||
|
||||
self.width_per_env_pixels = int(self.env_width / cfg.horizontal_scale)
|
||||
self.length_per_env_pixels = int(self.env_length / cfg.horizontal_scale)
|
||||
|
||||
self.border = int(cfg.border_size/self.cfg.horizontal_scale)
|
||||
self.tot_cols = int(cfg.num_cols * self.width_per_env_pixels) + 2 * self.border
|
||||
self.tot_rows = int(cfg.num_rows * self.length_per_env_pixels) + 2 * self.border
|
||||
|
||||
self.height_field_raw = np.zeros((self.tot_rows , self.tot_cols), dtype=np.int16)
|
||||
if cfg.curriculum:
|
||||
self.curiculum()
|
||||
elif cfg.selected:
|
||||
self.selected_terrain()
|
||||
else:
|
||||
self.randomized_terrain()
|
||||
|
||||
self.heightsamples = self.height_field_raw
|
||||
if self.type=="trimesh":
|
||||
self.vertices, self.triangles = terrain_utils.convert_heightfield_to_trimesh( self.height_field_raw,
|
||||
self.cfg.horizontal_scale,
|
||||
self.cfg.vertical_scale,
|
||||
self.cfg.slope_treshold)
|
||||
|
||||
def randomized_terrain(self):
|
||||
for k in range(self.cfg.num_sub_terrains):
|
||||
# Env coordinates in the world
|
||||
(i, j) = np.unravel_index(k, (self.cfg.num_rows, self.cfg.num_cols))
|
||||
|
||||
choice = np.random.uniform(0, 1)
|
||||
difficulty = np.random.choice([0.5, 0.75, 0.9])
|
||||
terrain = self.make_terrain(choice, difficulty)
|
||||
self.add_terrain_to_map(terrain, i, j)
|
||||
|
||||
def curiculum(self):
|
||||
for j in range(self.cfg.num_cols):
|
||||
for i in range(self.cfg.num_rows):
|
||||
difficulty = i / self.cfg.num_rows
|
||||
choice = j / self.cfg.num_cols + 0.001
|
||||
|
||||
terrain = self.make_terrain(choice, difficulty)
|
||||
self.add_terrain_to_map(terrain, i, j)
|
||||
|
||||
def selected_terrain(self):
|
||||
terrain_type = self.cfg.terrain_kwargs.pop('type')
|
||||
for k in range(self.cfg.num_sub_terrains):
|
||||
# Env coordinates in the world
|
||||
(i, j) = np.unravel_index(k, (self.cfg.num_rows, self.cfg.num_cols))
|
||||
|
||||
terrain = terrain_utils.SubTerrain("terrain",
|
||||
width=self.width_per_env_pixels,
|
||||
length=self.width_per_env_pixels,
|
||||
vertical_scale=self.vertical_scale,
|
||||
horizontal_scale=self.horizontal_scale)
|
||||
|
||||
eval(terrain_type)(terrain, **self.cfg.terrain_kwargs.terrain_kwargs)
|
||||
self.add_terrain_to_map(terrain, i, j)
|
||||
|
||||
def make_terrain(self, choice, difficulty):
|
||||
terrain = terrain_utils.SubTerrain( "terrain",
|
||||
width=self.width_per_env_pixels,
|
||||
length=self.width_per_env_pixels,
|
||||
vertical_scale=self.cfg.vertical_scale,
|
||||
horizontal_scale=self.cfg.horizontal_scale)
|
||||
slope = difficulty * 0.4
|
||||
step_height = 0.05 + 0.18 * difficulty
|
||||
discrete_obstacles_height = 0.05 + difficulty * 0.2
|
||||
stepping_stones_size = 1.5 * (1.05 - difficulty)
|
||||
stone_distance = 0.05 if difficulty==0 else 0.1
|
||||
gap_size = 1. * difficulty
|
||||
pit_depth = 1. * difficulty
|
||||
if choice < self.proportions[0]:
|
||||
if choice < self.proportions[0]/ 2:
|
||||
slope *= -1
|
||||
terrain_utils.pyramid_sloped_terrain(terrain, slope=slope, platform_size=3.)
|
||||
elif choice < self.proportions[1]:
|
||||
terrain_utils.pyramid_sloped_terrain(terrain, slope=slope, platform_size=3.)
|
||||
terrain_utils.random_uniform_terrain(terrain, min_height=-0.05, max_height=0.05, step=0.005, downsampled_scale=0.2)
|
||||
elif choice < self.proportions[3]:
|
||||
if choice<self.proportions[2]:
|
||||
step_height *= -1
|
||||
terrain_utils.pyramid_stairs_terrain(terrain, step_width=0.31, step_height=step_height, platform_size=3.)
|
||||
elif choice < self.proportions[4]:
|
||||
num_rectangles = 20
|
||||
rectangle_min_size = 1.
|
||||
rectangle_max_size = 2.
|
||||
terrain_utils.discrete_obstacles_terrain(terrain, discrete_obstacles_height, rectangle_min_size, rectangle_max_size, num_rectangles, platform_size=3.)
|
||||
elif choice < self.proportions[5]:
|
||||
terrain_utils.stepping_stones_terrain(terrain, stone_size=stepping_stones_size, stone_distance=stone_distance, max_height=0., platform_size=4.)
|
||||
elif choice < self.proportions[6]:
|
||||
gap_terrain(terrain, gap_size=gap_size, platform_size=3.)
|
||||
else:
|
||||
pit_terrain(terrain, depth=pit_depth, platform_size=4.)
|
||||
|
||||
return terrain
|
||||
|
||||
def add_terrain_to_map(self, terrain, row, col):
|
||||
i = row
|
||||
j = col
|
||||
# map coordinate system
|
||||
start_x = self.border + i * self.length_per_env_pixels
|
||||
end_x = self.border + (i + 1) * self.length_per_env_pixels
|
||||
start_y = self.border + j * self.width_per_env_pixels
|
||||
end_y = self.border + (j + 1) * self.width_per_env_pixels
|
||||
self.height_field_raw[start_x: end_x, start_y:end_y] = terrain.height_field_raw
|
||||
|
||||
env_origin_x = (i + 0.5) * self.env_length
|
||||
env_origin_y = (j + 0.5) * self.env_width
|
||||
x1 = int((self.env_length/2. - 1) / terrain.horizontal_scale)
|
||||
x2 = int((self.env_length/2. + 1) / terrain.horizontal_scale)
|
||||
y1 = int((self.env_width/2. - 1) / terrain.horizontal_scale)
|
||||
y2 = int((self.env_width/2. + 1) / terrain.horizontal_scale)
|
||||
env_origin_z = np.max(terrain.height_field_raw[x1:x2, y1:y2])*terrain.vertical_scale
|
||||
self.env_origins[i, j] = [env_origin_x, env_origin_y, env_origin_z]
|
||||
|
||||
def gap_terrain(terrain, gap_size, platform_size=1.):
|
||||
gap_size = int(gap_size / terrain.horizontal_scale)
|
||||
platform_size = int(platform_size / terrain.horizontal_scale)
|
||||
|
||||
center_x = terrain.length // 2
|
||||
center_y = terrain.width // 2
|
||||
x1 = (terrain.length - platform_size) // 2
|
||||
x2 = x1 + gap_size
|
||||
y1 = (terrain.width - platform_size) // 2
|
||||
y2 = y1 + gap_size
|
||||
|
||||
terrain.height_field_raw[center_x-x2 : center_x + x2, center_y-y2 : center_y + y2] = -1000
|
||||
terrain.height_field_raw[center_x-x1 : center_x + x1, center_y-y1 : center_y + y1] = 0
|
||||
|
||||
def pit_terrain(terrain, depth, platform_size=1.):
|
||||
depth = int(depth / terrain.vertical_scale)
|
||||
platform_size = int(platform_size / terrain.horizontal_scale / 2)
|
||||
x1 = terrain.length // 2 - platform_size
|
||||
x2 = terrain.length // 2 + platform_size
|
||||
y1 = terrain.width // 2 - platform_size
|
||||
y2 = terrain.width // 2 + platform_size
|
||||
terrain.height_field_raw[x1:x2, y1:y2] = -depth
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue