115 lines
4.4 KiB
Python
115 lines
4.4 KiB
Python
|
|
import sys
|
|
from isaacgym import gymapi
|
|
from isaacgym import gymutil
|
|
import numpy as np
|
|
import torch
|
|
|
|
# Base class for RL tasks
|
|
class BaseTask():
|
|
|
|
def __init__(self, cfg, sim_params, physics_engine, sim_device, headless):
|
|
self.gym = gymapi.acquire_gym()
|
|
|
|
self.sim_params = sim_params
|
|
self.physics_engine = physics_engine
|
|
self.sim_device = sim_device
|
|
sim_device_type, self.sim_device_id = gymutil.parse_device_str(self.sim_device)
|
|
self.headless = headless
|
|
|
|
# env device is GPU only if sim is on GPU and use_gpu_pipeline=True, otherwise returned tensors are copied to CPU by physX.
|
|
if sim_device_type=='cuda' and sim_params.use_gpu_pipeline:
|
|
self.device = self.sim_device
|
|
else:
|
|
self.device = 'cpu'
|
|
|
|
# graphics device for rendering, -1 for no rendering
|
|
self.graphics_device_id = self.sim_device_id
|
|
if self.headless == True:
|
|
self.graphics_device_id = -1
|
|
|
|
self.num_envs = cfg.env.num_envs
|
|
self.num_obs = cfg.env.num_observations
|
|
self.num_privileged_obs = cfg.env.num_privileged_obs
|
|
self.num_actions = cfg.env.num_actions
|
|
|
|
# optimization flags for pytorch JIT
|
|
torch._C._jit_set_profiling_mode(False)
|
|
torch._C._jit_set_profiling_executor(False)
|
|
|
|
# allocate buffers
|
|
self.obs_buf = torch.zeros(self.num_envs, self.num_obs, device=self.device, dtype=torch.float)
|
|
self.rew_buf = torch.zeros(self.num_envs, device=self.device, dtype=torch.float)
|
|
self.reset_buf = torch.ones(self.num_envs, device=self.device, dtype=torch.long)
|
|
self.episode_length_buf = torch.zeros(self.num_envs, device=self.device, dtype=torch.long)
|
|
self.time_out_buf = torch.zeros(self.num_envs, device=self.device, dtype=torch.bool)
|
|
if self.num_privileged_obs is not None:
|
|
self.privileged_obs_buf = torch.zeros(self.num_envs, self.num_privileged_obs, device=self.device, dtype=torch.float)
|
|
else:
|
|
self.privileged_obs_buf = None
|
|
# self.num_privileged_obs = self.num_obs
|
|
|
|
self.extras = {}
|
|
|
|
# create envs, sim and viewer
|
|
self.create_sim()
|
|
self.gym.prepare_sim(self.sim)
|
|
|
|
# todo: read from config
|
|
self.enable_viewer_sync = True
|
|
self.viewer = None
|
|
|
|
# if running with a viewer, set up keyboard shortcuts and camera
|
|
if self.headless == False:
|
|
# subscribe to keyboard shortcuts
|
|
self.viewer = self.gym.create_viewer(
|
|
self.sim, gymapi.CameraProperties())
|
|
self.gym.subscribe_viewer_keyboard_event(
|
|
self.viewer, gymapi.KEY_ESCAPE, "QUIT")
|
|
self.gym.subscribe_viewer_keyboard_event(
|
|
self.viewer, gymapi.KEY_V, "toggle_viewer_sync")
|
|
|
|
def get_observations(self):
|
|
return self.obs_buf
|
|
|
|
def get_privileged_observations(self):
|
|
return self.privileged_obs_buf
|
|
|
|
def reset_idx(self, env_ids):
|
|
"""Reset selected robots"""
|
|
raise NotImplementedError
|
|
|
|
def reset(self):
|
|
""" Reset all robots"""
|
|
self.reset_idx(torch.arange(self.num_envs, device=self.device))
|
|
obs, privileged_obs, _, _, _ = self.step(torch.zeros(self.num_envs, self.num_actions, device=self.device, requires_grad=False))
|
|
return obs, privileged_obs
|
|
|
|
def step(self, actions):
|
|
raise NotImplementedError
|
|
|
|
def render(self, sync_frame_time=True):
|
|
if self.viewer:
|
|
# check for window closed
|
|
if self.gym.query_viewer_has_closed(self.viewer):
|
|
sys.exit()
|
|
|
|
# check for keyboard events
|
|
for evt in self.gym.query_viewer_action_events(self.viewer):
|
|
if evt.action == "QUIT" and evt.value > 0:
|
|
sys.exit()
|
|
elif evt.action == "toggle_viewer_sync" and evt.value > 0:
|
|
self.enable_viewer_sync = not self.enable_viewer_sync
|
|
|
|
# fetch results
|
|
if self.device != 'cpu':
|
|
self.gym.fetch_results(self.sim, True)
|
|
|
|
# step graphics
|
|
if self.enable_viewer_sync:
|
|
self.gym.step_graphics(self.sim)
|
|
self.gym.draw_viewer(self.viewer, self.sim, True)
|
|
if sync_frame_time:
|
|
self.gym.sync_frame_time(self.sim)
|
|
else:
|
|
self.gym.poll_viewer_events(self.viewer) |