isaacsim draft codes deleted

This commit is contained in:
Rooholla-Khorrambakht 2024-05-20 19:41:14 -04:00
parent fe6e183e75
commit ed5e90c55f
21 changed files with 5 additions and 1595 deletions

View File

@ -1,5 +1,4 @@
import struct
import threading
import time
import numpy as np
import numpy.linalg as LA

View File

@ -1,124 +0,0 @@
import time
import numpy as np
from B1Py import B1_ISAACSIM_CFG_PATH
from B1Py.lcm_types.unitree_lowlevel import UnitreeLowCommand, UnitreeLowState
from B1Py.sim.utils import LCMBridgeClient, NumpyMemMapDataPipe, load_config
class B1IsaacSim:
"""
Class for communication with the simulated Franka Emika FR3 robot in Isaac Sim.
It uses LCM to send joint velocity to the simulated robot and receive joint states
(joint angle, velocity, and torque) and camera images from the robot.
"""
def __init__(self, robot_id="b1"):
"""
@param robot_id: (str) The name of the robot in the Isaac Sim scene.
"""
self.lcm_bridge = LCMBridgeClient(robot_name=robot_id)
self.cfg = load_config(B1_ISAACSIM_CFG_PATH)
self.camera_pipes = {}
for camera in self.cfg["cameras"]:
for type in camera["type"]:
if type == "rgb":
shape = (camera["resolution"][1], camera["resolution"][0], 4)
else:
shape = (camera["resolution"][1], camera["resolution"][0])
self.camera_pipes[camera["name"] + "_" + type] = NumpyMemMapDataPipe(
camera["name"] + "_" + type, force=False, shape=shape
)
print(camera["name"] + "_" + type)
def LCMThreadFunc(self):
"""
Function that runs on a separate thread and handles LCM communication.
`lc.handle()` function is called when there are data available to be processed.
"""
while self.running:
rfds, wfds, efds = select.select([self.lc.fileno()], [], [], 0.5)
if rfds: # Handle only if there are data in the interface file
self.lc.handle()
def getStates(self):
"""
Get the current joint angle, velocity, and torque of the robot.
@return: (dict or None) The joint state {'q': (numpy.ndarray, shape=(n,)),
'dq': (numpy.ndarray, shape=(n,)), 'T': (numpy.ndarray, shape=(n,))}
if the latest update was received less than 0.2 seconds ago;
otherwise, return None.
"""
state = self.lcm_bridge.getStates(timeout=0.1)
if state is not None:
q = np.array(state.q)
dq = np.array(state.dq)
tau_est = np.array(state.tau_est)
contact_state = np.array(state.contact_state)
accel = np.array(state.accel)
gyro = np.array(state.gyro)
temperature = np.array(state.temperature)
quaternion = np.array(state.quaternion)
rpy = np.array(state.rpy)
gravity = np.array(state.gravity)
gt_pos = np.array(state.gt_pos)
gt_quat = np.array(state.gt_quat)
gt_lin_vel = np.array(state.gt_lin_vel)
gt_ang_vel = np.array(state.gt_ang_vel)
return {
"q": q,
"dq": dq,
"tau_est": tau_est,
"contact_state": contact_state,
"accel": accel,
"gyro": gyro,
"temperature": temperature,
"quaternion": quaternion,
"rpy": rpy,
"gravity": gravity,
"gt_pos": gt_pos,
"gt_quat": gt_quat,
"gt_lin_vel": gt_lin_vel,
"gt_ang_vel": gt_ang_vel,
}
else:
return None
def readCameras(self):
data = {key: pipe.read() for key, pipe in self.camera_pipes.items()}
return data
def sendCommands(self, kp, kd, q_des, dq_des, tau_ff):
"""
Send a joint velocity command to the robot.
@param kp: (numpy.ndarray, shape=(n,)) The proportional gain for the joint position control.
@param kd: (numpy.ndarray, shape=(n,)) The derivative gain for the joint position control.
@param q_des: (numpy.ndarray, shape=(n,)) The desired joint position.
@param dq_des: (numpy.ndarray, shape=(n,)) The desired joint velocity.
@param tau_ff: (numpy.ndarray, shape=(n,)) The feedforward torque.
"""
cmd_lcm = UnitreeLowCommand()
cmd_lcm.tau_ff = tau_ff.tolist()
cmd_lcm.kp = kp.tolist()
cmd_lcm.kd = kd.tolist()
cmd_lcm.q_des = q_des.tolist()
cmd_lcm.dq_des = dq_des.tolist()
self.lcm_bridge.sendCommands(cmd_lcm)
def close(self):
"""
Stop the LCM thread, unsubscribe from the LCM topic,
and effectively shut down the interface.
"""
self.lcm_bridge.close()
def reset(self):
time.sleep(0.3)
for i in range(100):
time.sleep(0.01)
self.sendCommands(np.zeros(9))
state = self.getStates()

View File

@ -1,272 +0,0 @@
from collections import deque
from typing import Optional
import numpy as np
from omni.isaac.core.articulations import Articulation
from omni.isaac.core.utils.prims import define_prim
from omni.isaac.sensor import ContactSensor, IMUSensor
import Go2Py
from Go2Py.sim.isaacsim.lcm_types.unitree_lowlevel import UnitreeLowState
class UnitreeGo2(Articulation):
def __init__(
self,
prim_path: str,
name: str = "go2_quadruped",
physics_dt: Optional[float] = 1 / 400.0,
position: Optional[np.ndarray] = None,
orientation: Optional[np.ndarray] = None,
usd_path: Optional[str] = None,
) -> None:
"""
[Summary]
initialize robot, set up sensors and interfaces
Args:
prim_path {str} -- prim path of the robot on the stage
name {str} -- name of the quadruped
physics_dt {float} -- physics downtime of the controller
position {np.ndarray} -- position of the robot
orientation {np.ndarray} -- orientation of the robot
"""
self._prim_path = prim_path
prim = define_prim(self._prim_path, "Xform")
self.usd_path = usd_path
# breakpoint()
if self.usd_path is None:
prim.GetReferences().AddReference(Go2Py.GO2_USD_PATH)
else:
prim.GetReferences().AddReference(self.usd_path)
breakpoint()
super().__init__(
prim_path=self._prim_path + '/Go2/base',
name=name,
position=position,
orientation=orientation,
)
# contact sensor setup
self.feet_order = ["FL", "RL", "FR", "RR"]
self.feet_path = [
self._prim_path + "/Go2/base/FL_calf",
self._prim_path + "/Go2/base/FR_calf",
self._prim_path + "/Go2/base/RL_calf",
self._prim_path + "/Go2/base/RR_calf",
]
self.color = [(1, 0, 0, 1), (0, 1, 0, 1), (0, 0, 1, 1), (1, 1, 0, 1)]
self._contact_sensors = [None] * 4
for i in range(4):
self._contact_sensors[i] = ContactSensor(
prim_path=self.feet_path[i] + "/sensor",
min_threshold=0,
max_threshold=1000000,
radius=0.043,
dt=physics_dt,
)
self.foot_force = np.zeros(4)
self.enable_foot_filter = True
self._FILTER_WINDOW_SIZE = 20
self._foot_filters = [deque(), deque(), deque(), deque()]
# imu sensor setup
self.imu_path = self._prim_path + "/Go2/base/base"
self._imu_sensor = IMUSensor(
prim_path=self.imu_path + "/imu_sensor",
name="imu",
dt=physics_dt,
translation=np.array([0, 0, 0]),
orientation=np.array([1, 0, 0, 0]),
)
self.accel = np.zeros((3,))
self.gyro = np.zeros((3,))
# Translation maps between Isaac and Bullet
# self.bullet_joint_order = ['FL_hip_joint', 'FL_thigh_joint', 'FL_calf_joint',
# 'RL_hip_joint', 'RL_thigh_joint', 'RL_calf_joint',
# 'FR_hip_joint', 'FR_thigh_joint', 'FR_calf_joint',
# 'RR_hip_joint', 'RR_thigh_joint', 'RR_calf_joint']
self.bullet_joint_order = [
"FL_hip_joint",
"FL_thigh_joint",
"FL_calf_joint",
"FR_hip_joint",
"FR_thigh_joint",
"FR_calf_joint",
"RL_hip_joint",
"RL_thigh_joint",
"RL_calf_joint",
"RR_hip_joint",
"RR_thigh_joint",
"RR_calf_joint",
]
self.isaac_joint_order = [
"FL_hip_joint",
"FR_hip_joint",
"RL_hip_joint",
"RR_hip_joint",
"FL_thigh_joint",
"FR_thigh_joint",
"RL_thigh_joint",
"RR_thigh_joint",
"FL_calf_joint",
"FR_calf_joint",
"RL_calf_joint",
"RR_calf_joint",
]
self.isaac_name_2_index = {s: i for i, s in enumerate(self.isaac_joint_order)}
self.bullet_name_2_index = {s: i for i, s in enumerate(self.bullet_joint_order)}
self.to_bullet_index = np.array(
[self.isaac_name_2_index[id] for id in self.bullet_joint_order]
)
self.to_isaac_index = np.array(
[self.bullet_name_2_index[id] for id in self.isaac_joint_order]
)
self.tau_est = np.zeros((12,))
self.state = UnitreeLowState()
self.init_pos = np.array([0.0, 0.0, 0.6])
self.init_quat = np.array([0.0, 0.0, 0.0, 1.0])
self.init_joint_pos = np.array(
[0.2, 0.8, -1.5, -0.2, 0.8, -1.5, 0.2, 1.0, -1.6, -0.2, 1.0, -1.6]
)
return
def toIsaacOrder(self, x):
return x[self.to_isaac_index, ...]
def toBulletOrder(self, x):
return x[self.to_bullet_index, ...]
def setState(self, pos, quat, q) -> None:
"""[Summary]
Set the kinematic state of the robot.
Args:
pos {ndarray} -- The position of the robot (x, y, z)
quat {ndarray} -- The orientation of the robot (qx, qy, qz, qw)
q {ndarray} -- Joint angles of the robot in standard Pinocchio order
Raises:
RuntimeError: When the DC Toolbox interface has not been configured.
"""
self.set_world_pose(position=pos, orientation=quat[[3, 0, 1, 2]])
self.set_linear_velocity(np.zeros((3,)))
self.set_angular_velocity(np.zeros((3,)))
self.set_joint_positions(
positions=np.asarray(self.toIsaacOrder(q), dtype=np.float32)
)
self.set_joint_velocities(
velocities=np.asarray(self.toIsaacOrder(np.zeros((12,))), dtype=np.float32)
)
self.set_joint_efforts(np.zeros_like(q))
return
def initialize(self, physics_sim_view=None) -> None:
"""[summary]
initialize dc interface, set up drive mode and initial robot state
"""
super().initialize(physics_sim_view=physics_sim_view)
self.get_articulation_controller().set_effort_modes("force")
self.get_articulation_controller().switch_control_mode("effort")
self.setState(self.init_pos, self.init_quat, self.init_joint_pos)
for i in range(4):
self._contact_sensors[i].initialize()
return
def readContactSensor(self) -> None:
"""[summary]
Updates processed contact sensor data from the robot feet, store them in member variable foot_force
"""
# Order: FL, RL, FR, RR
for i in range(len(self.feet_path)):
frame = self._contact_sensors[i].get_current_frame()
if "force" in frame:
if self.enable_foot_filter:
self._foot_filters[i].append(frame["force"])
if len(self._foot_filters[i]) > self._FILTER_WINDOW_SIZE:
self._foot_filters[i].popleft()
self.foot_force[i] = np.mean(self._foot_filters[i])
else:
self.foot_force[i] = frame["force"]
return self.foot_force
def readIMUSensor(self) -> None:
"""[summary]
Updates processed imu sensor data from the robot body, store them in member variable base_lin and ang_vel
"""
frame = self._imu_sensor.get_current_frame()
self.accel = frame["lin_acc"]
self.gyro = frame["ang_vel"]
return self.accel, self.gyro
def readStates(self):
contact_forces = self.readContactSensor()
accel, gyro = self.readIMUSensor()
# joint pos and vel from the DC interface
joint_state = super().get_joints_state()
joint_pos = self.toBulletOrder(joint_state.positions)
joint_vel = self.toBulletOrder(joint_state.velocities)
# base frame
base_pose = self.get_world_pose()
base_pos = base_pose[0]
base_quat = base_pose[1][[1, 2, 3, 0]]
base_lin_vel = self.get_linear_velocity()
base_ang_vel = self.get_angular_velocity()
# assign to state objects
self.state.accel = accel.tolist()
self.state.gyro = gyro.tolist()
self.state.q = joint_pos.tolist()
self.state.dq = joint_vel.tolist()
self.state.tau_est = self.tau_est
self.state.quaternion = base_quat.tolist()
self.state.gt_pos = base_pos.tolist()
self.state.gt_quat = base_quat.tolist()
self.state.gt_lin_vel = base_lin_vel.tolist()
self.state.gt_ang_vel = base_ang_vel.tolist()
self.state.contact_state = contact_forces.tolist()
# projected_gravity = pp.SO3(base_quat).matrix().T @ np.array(
# [0.0, 0.0, -1.0]
# ).reshape(3, 1)
# self.state.gravity = (
# projected_gravity.squeeze().numpy().astype(np.float32).tolist()
# )
return self.state
def setCommands(self, cmd):
"""[summary]
sets the joint torques
Argument:
action {np.ndarray} -- Joint torque command
"""
q = np.array(self.state.q)
dq = np.array(self.state.dq)
kp = np.array(cmd.kp)
kd = np.array(cmd.kd)
tau_ff = np.array(cmd.tau_ff)
q_des = np.array(cmd.q_des)
dq_des = np.array(cmd.dq_des)
tau = (q_des - q) * kp + (dq_des - dq) * kd + tau_ff
self.set_joint_efforts(np.asarray(self.toIsaacOrder(tau), dtype=np.float32))
self.tau_est = tau
return
def step(self, cmd):
self.readStates()
self.setCommands(cmd)
return self.state

View File

@ -1,205 +0,0 @@
# launch Isaac Sim before any other imports
# default first two lines in any standalone application
from Go2Py.sim.utils import (
NumpyMemMapDataPipe,
load_config,
simulationManager,
)
from Go2Py.sim.isaacsim.utils import AnnotatorManager
from Go2Py.sim.isaacsim.go2 import UnitreeGo2
from Go2Py.sim.isaacsim.lcm_types.unitree_lowlevel import UnitreeLowCommand
import Go2Py
from omni.isaac.sensor import RotatingLidarPhysX
from omni.isaac.core.utils.prims import define_prim, get_prim_at_path
from omni.isaac.core.utils.nucleus import get_assets_root_path
from omni.isaac.core import World
import pickle
import sys
import time
import numpy as np
from omni.isaac.kit import SimulationApp
simulation_app = SimulationApp({"headless": False}) # we can also run as headless.
# import cv2
cfg = load_config(Go2Py.GO2_ISAACSIM_CFG_PATH)
robots = cfg["robots"]
if cfg["cameras"] is None:
cameras = []
else:
cameras = cfg["cameras"]
env_cfg = cfg["environment"]
PHYSICS_DT = 1 / 200
RENDERING_DT = 1 / 200
# create the world
world = World(
physics_dt=cfg["environment"]["simulation_dt"],
rendering_dt=cfg["environment"]["rendering_dt"],
)
assets_root_path = get_assets_root_path()
if assets_root_path is None:
print("Could not find Isaac Sim assets folder")
prim = get_prim_at_path(env_cfg["prim_path"])
print("Adding the environment")
if not prim.IsValid():
prim = define_prim(env_cfg["prim_path"], "Xform")
asset_path = (
assets_root_path + env_cfg["usd_path"]
if env_cfg["buildin"] is True
else env_cfg["usd_path"]
)
prim.GetReferences().AddReference(asset_path)
print("Adding the robot")
go2 = world.scene.add(
UnitreeGo2(
prim_path=robots[0]["prim_path"],
usd_path=(robots[0]["usd_path"] if robots[0]["usd_path"] != "" else None),
name=robots[0]["name"],
position=np.array(robots[0]["position"]),
physics_dt=cfg["environment"]["simulation_dt"],
)
)
# Add Lidar if required
if robots[0]["lidar"]:
lidar = world.scene.add(
RotatingLidarPhysX(
prim_path="/World/Env/GO2/Go2/base/base/lidar",
name="lidar",
translation=[0.16, 0.0, 0.14],
orientation=[0.0, 0.0, 0.0, 1.0],
)
)
lidar.add_depth_data_to_frame()
lidar.add_point_cloud_data_to_frame()
lidar.set_rotation_frequency(0)
lidar.set_resolution([0.4, 2])
# lidar.enable_visualization()
lidar.prim.GetAttribute("highLod").Set(True)
lidar.prim.GetAttribute("highLod").Set(True)
lidar_data_pipe = NumpyMemMapDataPipe(
"lidar_data_pipe", force=True, dtype="float32", shape=(900, 16, 3)
)
# Add cameras if required
print("Adding cameras")
ann = AnnotatorManager(world)
for camera in cameras:
ann.registerCamera(
camera["prim_path"],
camera["name"],
translation=camera["translation"], # xyz
orientation=camera["orientation"], # xyzw
resolution=camera["resolution"],
)
for type in camera["type"]:
ann.registerAnnotator(type, camera["name"])
ann.setClippingRange(camera["name"], 0.2, 1000000.0)
ann.setFocalLength(camera["name"], 28)
# Add the shared memory data channels for image and LiDAR data
print("Creating shared memory data pipes")
camera_pipes = {}
for camera in cameras:
for type in camera["type"]:
if type == "pointcloud":
pipe = NumpyMemMapDataPipe(
camera["name"] + "_" + type,
force=True,
dtype="float32",
shape=(camera["resolution"][1] * camera["resolution"][0], 3),
)
camera_pipes[camera["name"] + "_" + type] = pipe
elif type == "rgb":
pipe = NumpyMemMapDataPipe(
camera["name"] + "_" + type,
force=True,
dtype="uint8",
shape=(camera["resolution"][1], camera["resolution"][0], 4),
)
camera_pipes[camera["name"] + "_" + type] = pipe
elif type == "distance_to_camera":
pipe = NumpyMemMapDataPipe(
camera["name"] + "_" + type,
force=True,
dtype="uint8",
shape=(camera["resolution"][1], camera["resolution"][0]),
)
camera_pipes[camera["name"] + "_" + type] = pipe
# lcm_server = LCMBridgeServer(robot_name="b1")
cmd_stamp = time.time()
cmd_stamp_old = cmd_stamp
cmd = UnitreeLowCommand()
cmd.kd = 12 * [2.5]
cmd.kp = 12 * [100]
cmd.dq_des = 12 * [0]
cmd.q_des = go2.init_joint_pos
world.reset()
go2.initialize()
# while simulation_app.is_running():
# world.step(render=True)
# breakpoint()
# sim_manager = simulationManager(
# robot=go2,
# lcm_server=lcm_server,
# default_cmd=cmd,
# physics_dt=PHYSICS_DT,
# lcm_timeout=1e-4,
# )
counter = 0
while simulation_app.is_running():
# sim_manager.step(counter*PHYSICS_DT)
# Step the world with rendering 50 times per second
# sim_manager.step(counter * PHYSICS_DT)
if counter % 4 == 0:
world.step(render=True)
if robots[0]["lidar"]:
pc = lidar.get_current_frame()["point_cloud"]
lidar_data_pipe.write(pc, match_length=True)
# Push the sensor data to the shared memory pipes
for camera in cameras:
for type in camera["type"]:
data = ann.getData(f"{camera['name']}:{type}")
if type == "pointcloud":
payload = data["data"]
if payload.shape[0]:
camera_pipes[camera["name"] + "_" + type].write(
np.zeros(
(camera["resolution"][1] * camera["resolution"][0], 3)
),
match_length=True,
)
camera_pipes[camera["name"] + "_" + type].write(
payload, match_length=True
)
else:
if data.shape[0]:
camera_pipes[camera["name"] + "_" + type].write(
data, match_length=False
)
else:
world.step(render=False)
counter += 1
simulation_app.close() # close Isaac Sim

View File

@ -1,9 +0,0 @@
package unitree_lowlevel;
struct UnitreeLowCommand {
double stamp;
float tau_ff[12];
float q_des[12];
float dq_des[12];
float kp[12];
float kd[12];
}

View File

@ -1,18 +0,0 @@
package unitree_lowlevel;
struct UnitreeLowState {
double stamp;
float q[12];
float dq[12];
float tau_est[12];
float contact_state[4];
float accel[3];
float gyro[3];
float temperature;
float quaternion[4];
float rpy[3];
float gravity[3];
float gt_pos[3];
float gt_quat[4];
float gt_lin_vel[3];
float gt_ang_vel[3];
}

View File

@ -1,12 +0,0 @@
#!/bin/bash
GREEN='\033[0;32m'
NC='\033[0m' # No Color
echo -e "${GREEN} Starting LCM type generation...${NC}"
# Clean
rm -r unitree_lowlevel
# Make
lcm-gen -xp *.lcm
echo -e "${GREEN} Done with LCM type generation${NC}"

View File

@ -1,193 +0,0 @@
/** THIS IS AN AUTOMATICALLY GENERATED FILE. DO NOT MODIFY
* BY HAND!!
*
* Generated by lcm-gen
**/
#ifndef __unitree_lowlevel_UnitreeLowCommand_hpp__
#define __unitree_lowlevel_UnitreeLowCommand_hpp__
#include <lcm/lcm_coretypes.h>
namespace unitree_lowlevel
{
class UnitreeLowCommand
{
public:
double stamp;
float tau_ff[12];
float q_des[12];
float dq_des[12];
float kp[12];
float kd[12];
public:
/**
* Encode a message into binary form.
*
* @param buf The output buffer.
* @param offset Encoding starts at this byte offset into @p buf.
* @param maxlen Maximum number of bytes to write. This should generally be
* equal to getEncodedSize().
* @return The number of bytes encoded, or <0 on error.
*/
inline int encode(void *buf, int offset, int maxlen) const;
/**
* Check how many bytes are required to encode this message.
*/
inline int getEncodedSize() const;
/**
* Decode a message from binary form into this instance.
*
* @param buf The buffer containing the encoded message.
* @param offset The byte offset into @p buf where the encoded message starts.
* @param maxlen The maximum number of bytes to read while decoding.
* @return The number of bytes decoded, or <0 if an error occurred.
*/
inline int decode(const void *buf, int offset, int maxlen);
/**
* Retrieve the 64-bit fingerprint identifying the structure of the message.
* Note that the fingerprint is the same for all instances of the same
* message type, and is a fingerprint on the message type definition, not on
* the message contents.
*/
inline static int64_t getHash();
/**
* Returns "UnitreeLowCommand"
*/
inline static const char* getTypeName();
// LCM support functions. Users should not call these
inline int _encodeNoHash(void *buf, int offset, int maxlen) const;
inline int _getEncodedSizeNoHash() const;
inline int _decodeNoHash(const void *buf, int offset, int maxlen);
inline static uint64_t _computeHash(const __lcm_hash_ptr *p);
};
int UnitreeLowCommand::encode(void *buf, int offset, int maxlen) const
{
int pos = 0, tlen;
int64_t hash = getHash();
tlen = __int64_t_encode_array(buf, offset + pos, maxlen - pos, &hash, 1);
if(tlen < 0) return tlen; else pos += tlen;
tlen = this->_encodeNoHash(buf, offset + pos, maxlen - pos);
if (tlen < 0) return tlen; else pos += tlen;
return pos;
}
int UnitreeLowCommand::decode(const void *buf, int offset, int maxlen)
{
int pos = 0, thislen;
int64_t msg_hash;
thislen = __int64_t_decode_array(buf, offset + pos, maxlen - pos, &msg_hash, 1);
if (thislen < 0) return thislen; else pos += thislen;
if (msg_hash != getHash()) return -1;
thislen = this->_decodeNoHash(buf, offset + pos, maxlen - pos);
if (thislen < 0) return thislen; else pos += thislen;
return pos;
}
int UnitreeLowCommand::getEncodedSize() const
{
return 8 + _getEncodedSizeNoHash();
}
int64_t UnitreeLowCommand::getHash()
{
static int64_t hash = static_cast<int64_t>(_computeHash(NULL));
return hash;
}
const char* UnitreeLowCommand::getTypeName()
{
return "UnitreeLowCommand";
}
int UnitreeLowCommand::_encodeNoHash(void *buf, int offset, int maxlen) const
{
int pos = 0, tlen;
tlen = __double_encode_array(buf, offset + pos, maxlen - pos, &this->stamp, 1);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_encode_array(buf, offset + pos, maxlen - pos, &this->tau_ff[0], 12);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_encode_array(buf, offset + pos, maxlen - pos, &this->q_des[0], 12);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_encode_array(buf, offset + pos, maxlen - pos, &this->dq_des[0], 12);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_encode_array(buf, offset + pos, maxlen - pos, &this->kp[0], 12);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_encode_array(buf, offset + pos, maxlen - pos, &this->kd[0], 12);
if(tlen < 0) return tlen; else pos += tlen;
return pos;
}
int UnitreeLowCommand::_decodeNoHash(const void *buf, int offset, int maxlen)
{
int pos = 0, tlen;
tlen = __double_decode_array(buf, offset + pos, maxlen - pos, &this->stamp, 1);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_decode_array(buf, offset + pos, maxlen - pos, &this->tau_ff[0], 12);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_decode_array(buf, offset + pos, maxlen - pos, &this->q_des[0], 12);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_decode_array(buf, offset + pos, maxlen - pos, &this->dq_des[0], 12);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_decode_array(buf, offset + pos, maxlen - pos, &this->kp[0], 12);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_decode_array(buf, offset + pos, maxlen - pos, &this->kd[0], 12);
if(tlen < 0) return tlen; else pos += tlen;
return pos;
}
int UnitreeLowCommand::_getEncodedSizeNoHash() const
{
int enc_size = 0;
enc_size += __double_encoded_array_size(NULL, 1);
enc_size += __float_encoded_array_size(NULL, 12);
enc_size += __float_encoded_array_size(NULL, 12);
enc_size += __float_encoded_array_size(NULL, 12);
enc_size += __float_encoded_array_size(NULL, 12);
enc_size += __float_encoded_array_size(NULL, 12);
return enc_size;
}
uint64_t UnitreeLowCommand::_computeHash(const __lcm_hash_ptr *)
{
uint64_t hash = 0x3fa55736ccae518dLL;
return (hash<<1) + ((hash>>63)&1);
}
}
#endif

View File

@ -1,88 +0,0 @@
"""LCM type definitions
This file automatically generated by lcm.
DO NOT MODIFY BY HAND!!!!
"""
try:
import cStringIO.StringIO as BytesIO
except ImportError:
from io import BytesIO
import struct
class UnitreeLowCommand:
__slots__ = ["stamp", "tau_ff", "q_des", "dq_des", "kp", "kd"]
__typenames__ = ["double", "float", "float", "float", "float", "float"]
__dimensions__ = [None, [12], [12], [12], [12], [12]]
def __init__(self):
self.stamp = 0.0
self.tau_ff = [0.0 for dim0 in range(12)]
self.q_des = [0.0 for dim0 in range(12)]
self.dq_des = [0.0 for dim0 in range(12)]
self.kp = [0.0 for dim0 in range(12)]
self.kd = [0.0 for dim0 in range(12)]
def encode(self):
buf = BytesIO()
buf.write(UnitreeLowCommand._get_packed_fingerprint())
self._encode_one(buf)
return buf.getvalue()
def _encode_one(self, buf):
buf.write(struct.pack(">d", self.stamp))
buf.write(struct.pack(">12f", *self.tau_ff[:12]))
buf.write(struct.pack(">12f", *self.q_des[:12]))
buf.write(struct.pack(">12f", *self.dq_des[:12]))
buf.write(struct.pack(">12f", *self.kp[:12]))
buf.write(struct.pack(">12f", *self.kd[:12]))
def decode(data):
if hasattr(data, "read"):
buf = data
else:
buf = BytesIO(data)
if buf.read(8) != UnitreeLowCommand._get_packed_fingerprint():
raise ValueError("Decode error")
return UnitreeLowCommand._decode_one(buf)
decode = staticmethod(decode)
def _decode_one(buf):
self = UnitreeLowCommand()
self.stamp = struct.unpack(">d", buf.read(8))[0]
self.tau_ff = struct.unpack(">12f", buf.read(48))
self.q_des = struct.unpack(">12f", buf.read(48))
self.dq_des = struct.unpack(">12f", buf.read(48))
self.kp = struct.unpack(">12f", buf.read(48))
self.kd = struct.unpack(">12f", buf.read(48))
return self
_decode_one = staticmethod(_decode_one)
def _get_hash_recursive(parents):
if UnitreeLowCommand in parents:
return 0
tmphash = (0x3FA55736CCAE518D) & 0xFFFFFFFFFFFFFFFF
tmphash = (
((tmphash << 1) & 0xFFFFFFFFFFFFFFFF) + (tmphash >> 63)
) & 0xFFFFFFFFFFFFFFFF
return tmphash
_get_hash_recursive = staticmethod(_get_hash_recursive)
_packed_fingerprint = None
def _get_packed_fingerprint():
if UnitreeLowCommand._packed_fingerprint is None:
UnitreeLowCommand._packed_fingerprint = struct.pack(
">Q", UnitreeLowCommand._get_hash_recursive([])
)
return UnitreeLowCommand._packed_fingerprint
_get_packed_fingerprint = staticmethod(_get_packed_fingerprint)
def get_hash(self):
"""Get the LCM hash of the struct"""
return struct.unpack(">Q", UnitreeLowCommand._get_packed_fingerprint())[0]

View File

@ -1,274 +0,0 @@
/** THIS IS AN AUTOMATICALLY GENERATED FILE. DO NOT MODIFY
* BY HAND!!
*
* Generated by lcm-gen
**/
#ifndef __unitree_lowlevel_UnitreeLowState_hpp__
#define __unitree_lowlevel_UnitreeLowState_hpp__
#include <lcm/lcm_coretypes.h>
namespace unitree_lowlevel
{
class UnitreeLowState
{
public:
double stamp;
float q[12];
float dq[12];
float tau_est[12];
float contact_state[4];
float accel[3];
float gyro[3];
float temperature;
float quaternion[4];
float rpy[3];
float gravity[3];
float gt_pos[3];
float gt_quat[4];
float gt_lin_vel[3];
float gt_ang_vel[3];
public:
/**
* Encode a message into binary form.
*
* @param buf The output buffer.
* @param offset Encoding starts at this byte offset into @p buf.
* @param maxlen Maximum number of bytes to write. This should generally be
* equal to getEncodedSize().
* @return The number of bytes encoded, or <0 on error.
*/
inline int encode(void *buf, int offset, int maxlen) const;
/**
* Check how many bytes are required to encode this message.
*/
inline int getEncodedSize() const;
/**
* Decode a message from binary form into this instance.
*
* @param buf The buffer containing the encoded message.
* @param offset The byte offset into @p buf where the encoded message starts.
* @param maxlen The maximum number of bytes to read while decoding.
* @return The number of bytes decoded, or <0 if an error occurred.
*/
inline int decode(const void *buf, int offset, int maxlen);
/**
* Retrieve the 64-bit fingerprint identifying the structure of the message.
* Note that the fingerprint is the same for all instances of the same
* message type, and is a fingerprint on the message type definition, not on
* the message contents.
*/
inline static int64_t getHash();
/**
* Returns "UnitreeLowState"
*/
inline static const char* getTypeName();
// LCM support functions. Users should not call these
inline int _encodeNoHash(void *buf, int offset, int maxlen) const;
inline int _getEncodedSizeNoHash() const;
inline int _decodeNoHash(const void *buf, int offset, int maxlen);
inline static uint64_t _computeHash(const __lcm_hash_ptr *p);
};
int UnitreeLowState::encode(void *buf, int offset, int maxlen) const
{
int pos = 0, tlen;
int64_t hash = getHash();
tlen = __int64_t_encode_array(buf, offset + pos, maxlen - pos, &hash, 1);
if(tlen < 0) return tlen; else pos += tlen;
tlen = this->_encodeNoHash(buf, offset + pos, maxlen - pos);
if (tlen < 0) return tlen; else pos += tlen;
return pos;
}
int UnitreeLowState::decode(const void *buf, int offset, int maxlen)
{
int pos = 0, thislen;
int64_t msg_hash;
thislen = __int64_t_decode_array(buf, offset + pos, maxlen - pos, &msg_hash, 1);
if (thislen < 0) return thislen; else pos += thislen;
if (msg_hash != getHash()) return -1;
thislen = this->_decodeNoHash(buf, offset + pos, maxlen - pos);
if (thislen < 0) return thislen; else pos += thislen;
return pos;
}
int UnitreeLowState::getEncodedSize() const
{
return 8 + _getEncodedSizeNoHash();
}
int64_t UnitreeLowState::getHash()
{
static int64_t hash = static_cast<int64_t>(_computeHash(NULL));
return hash;
}
const char* UnitreeLowState::getTypeName()
{
return "UnitreeLowState";
}
int UnitreeLowState::_encodeNoHash(void *buf, int offset, int maxlen) const
{
int pos = 0, tlen;
tlen = __double_encode_array(buf, offset + pos, maxlen - pos, &this->stamp, 1);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_encode_array(buf, offset + pos, maxlen - pos, &this->q[0], 12);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_encode_array(buf, offset + pos, maxlen - pos, &this->dq[0], 12);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_encode_array(buf, offset + pos, maxlen - pos, &this->tau_est[0], 12);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_encode_array(buf, offset + pos, maxlen - pos, &this->contact_state[0], 4);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_encode_array(buf, offset + pos, maxlen - pos, &this->accel[0], 3);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_encode_array(buf, offset + pos, maxlen - pos, &this->gyro[0], 3);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_encode_array(buf, offset + pos, maxlen - pos, &this->temperature, 1);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_encode_array(buf, offset + pos, maxlen - pos, &this->quaternion[0], 4);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_encode_array(buf, offset + pos, maxlen - pos, &this->rpy[0], 3);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_encode_array(buf, offset + pos, maxlen - pos, &this->gravity[0], 3);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_encode_array(buf, offset + pos, maxlen - pos, &this->gt_pos[0], 3);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_encode_array(buf, offset + pos, maxlen - pos, &this->gt_quat[0], 4);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_encode_array(buf, offset + pos, maxlen - pos, &this->gt_lin_vel[0], 3);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_encode_array(buf, offset + pos, maxlen - pos, &this->gt_ang_vel[0], 3);
if(tlen < 0) return tlen; else pos += tlen;
return pos;
}
int UnitreeLowState::_decodeNoHash(const void *buf, int offset, int maxlen)
{
int pos = 0, tlen;
tlen = __double_decode_array(buf, offset + pos, maxlen - pos, &this->stamp, 1);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_decode_array(buf, offset + pos, maxlen - pos, &this->q[0], 12);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_decode_array(buf, offset + pos, maxlen - pos, &this->dq[0], 12);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_decode_array(buf, offset + pos, maxlen - pos, &this->tau_est[0], 12);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_decode_array(buf, offset + pos, maxlen - pos, &this->contact_state[0], 4);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_decode_array(buf, offset + pos, maxlen - pos, &this->accel[0], 3);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_decode_array(buf, offset + pos, maxlen - pos, &this->gyro[0], 3);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_decode_array(buf, offset + pos, maxlen - pos, &this->temperature, 1);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_decode_array(buf, offset + pos, maxlen - pos, &this->quaternion[0], 4);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_decode_array(buf, offset + pos, maxlen - pos, &this->rpy[0], 3);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_decode_array(buf, offset + pos, maxlen - pos, &this->gravity[0], 3);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_decode_array(buf, offset + pos, maxlen - pos, &this->gt_pos[0], 3);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_decode_array(buf, offset + pos, maxlen - pos, &this->gt_quat[0], 4);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_decode_array(buf, offset + pos, maxlen - pos, &this->gt_lin_vel[0], 3);
if(tlen < 0) return tlen; else pos += tlen;
tlen = __float_decode_array(buf, offset + pos, maxlen - pos, &this->gt_ang_vel[0], 3);
if(tlen < 0) return tlen; else pos += tlen;
return pos;
}
int UnitreeLowState::_getEncodedSizeNoHash() const
{
int enc_size = 0;
enc_size += __double_encoded_array_size(NULL, 1);
enc_size += __float_encoded_array_size(NULL, 12);
enc_size += __float_encoded_array_size(NULL, 12);
enc_size += __float_encoded_array_size(NULL, 12);
enc_size += __float_encoded_array_size(NULL, 4);
enc_size += __float_encoded_array_size(NULL, 3);
enc_size += __float_encoded_array_size(NULL, 3);
enc_size += __float_encoded_array_size(NULL, 1);
enc_size += __float_encoded_array_size(NULL, 4);
enc_size += __float_encoded_array_size(NULL, 3);
enc_size += __float_encoded_array_size(NULL, 3);
enc_size += __float_encoded_array_size(NULL, 3);
enc_size += __float_encoded_array_size(NULL, 4);
enc_size += __float_encoded_array_size(NULL, 3);
enc_size += __float_encoded_array_size(NULL, 3);
return enc_size;
}
uint64_t UnitreeLowState::_computeHash(const __lcm_hash_ptr *)
{
uint64_t hash = 0x63fba9ef911d1605LL;
return (hash<<1) + ((hash>>63)&1);
}
}
#endif

View File

@ -1,163 +0,0 @@
"""LCM type definitions
This file automatically generated by lcm.
DO NOT MODIFY BY HAND!!!!
"""
try:
import cStringIO.StringIO as BytesIO
except ImportError:
from io import BytesIO
import struct
class UnitreeLowState:
__slots__ = [
"stamp",
"q",
"dq",
"tau_est",
"contact_state",
"accel",
"gyro",
"temperature",
"quaternion",
"rpy",
"gravity",
"gt_pos",
"gt_quat",
"gt_lin_vel",
"gt_ang_vel",
]
__typenames__ = [
"double",
"float",
"float",
"float",
"float",
"float",
"float",
"float",
"float",
"float",
"float",
"float",
"float",
"float",
"float",
]
__dimensions__ = [
None,
[12],
[12],
[12],
[4],
[3],
[3],
None,
[4],
[3],
[3],
[3],
[4],
[3],
[3],
]
def __init__(self):
self.stamp = 0.0
self.q = [0.0 for dim0 in range(12)]
self.dq = [0.0 for dim0 in range(12)]
self.tau_est = [0.0 for dim0 in range(12)]
self.contact_state = [0.0 for dim0 in range(4)]
self.accel = [0.0 for dim0 in range(3)]
self.gyro = [0.0 for dim0 in range(3)]
self.temperature = 0.0
self.quaternion = [0.0 for dim0 in range(4)]
self.rpy = [0.0 for dim0 in range(3)]
self.gravity = [0.0 for dim0 in range(3)]
self.gt_pos = [0.0 for dim0 in range(3)]
self.gt_quat = [0.0 for dim0 in range(4)]
self.gt_lin_vel = [0.0 for dim0 in range(3)]
self.gt_ang_vel = [0.0 for dim0 in range(3)]
def encode(self):
buf = BytesIO()
buf.write(UnitreeLowState._get_packed_fingerprint())
self._encode_one(buf)
return buf.getvalue()
def _encode_one(self, buf):
buf.write(struct.pack(">d", self.stamp))
buf.write(struct.pack(">12f", *self.q[:12]))
buf.write(struct.pack(">12f", *self.dq[:12]))
buf.write(struct.pack(">12f", *self.tau_est[:12]))
buf.write(struct.pack(">4f", *self.contact_state[:4]))
buf.write(struct.pack(">3f", *self.accel[:3]))
buf.write(struct.pack(">3f", *self.gyro[:3]))
buf.write(struct.pack(">f", self.temperature))
buf.write(struct.pack(">4f", *self.quaternion[:4]))
buf.write(struct.pack(">3f", *self.rpy[:3]))
buf.write(struct.pack(">3f", *self.gravity[:3]))
buf.write(struct.pack(">3f", *self.gt_pos[:3]))
buf.write(struct.pack(">4f", *self.gt_quat[:4]))
buf.write(struct.pack(">3f", *self.gt_lin_vel[:3]))
buf.write(struct.pack(">3f", *self.gt_ang_vel[:3]))
def decode(data):
if hasattr(data, "read"):
buf = data
else:
buf = BytesIO(data)
if buf.read(8) != UnitreeLowState._get_packed_fingerprint():
raise ValueError("Decode error")
return UnitreeLowState._decode_one(buf)
decode = staticmethod(decode)
def _decode_one(buf):
self = UnitreeLowState()
self.stamp = struct.unpack(">d", buf.read(8))[0]
self.q = struct.unpack(">12f", buf.read(48))
self.dq = struct.unpack(">12f", buf.read(48))
self.tau_est = struct.unpack(">12f", buf.read(48))
self.contact_state = struct.unpack(">4f", buf.read(16))
self.accel = struct.unpack(">3f", buf.read(12))
self.gyro = struct.unpack(">3f", buf.read(12))
self.temperature = struct.unpack(">f", buf.read(4))[0]
self.quaternion = struct.unpack(">4f", buf.read(16))
self.rpy = struct.unpack(">3f", buf.read(12))
self.gravity = struct.unpack(">3f", buf.read(12))
self.gt_pos = struct.unpack(">3f", buf.read(12))
self.gt_quat = struct.unpack(">4f", buf.read(16))
self.gt_lin_vel = struct.unpack(">3f", buf.read(12))
self.gt_ang_vel = struct.unpack(">3f", buf.read(12))
return self
_decode_one = staticmethod(_decode_one)
def _get_hash_recursive(parents):
if UnitreeLowState in parents:
return 0
tmphash = (0x63FBA9EF911D1605) & 0xFFFFFFFFFFFFFFFF
tmphash = (
((tmphash << 1) & 0xFFFFFFFFFFFFFFFF) + (tmphash >> 63)
) & 0xFFFFFFFFFFFFFFFF
return tmphash
_get_hash_recursive = staticmethod(_get_hash_recursive)
_packed_fingerprint = None
def _get_packed_fingerprint():
if UnitreeLowState._packed_fingerprint is None:
UnitreeLowState._packed_fingerprint = struct.pack(
">Q", UnitreeLowState._get_hash_recursive([])
)
return UnitreeLowState._packed_fingerprint
_get_packed_fingerprint = staticmethod(_get_packed_fingerprint)
def get_hash(self):
"""Get the LCM hash of the struct"""
return struct.unpack(">Q", UnitreeLowState._get_packed_fingerprint())[0]

View File

@ -1,7 +0,0 @@
"""LCM package __init__.py file
This file automatically generated by lcm-gen.
DO NOT MODIFY BY HAND!!!!
"""
from .UnitreeLowCommand import UnitreeLowCommand
from .UnitreeLowState import UnitreeLowState

View File

@ -1,29 +0,0 @@
environment:
simulation_dt: 0.01
rendering_dt: 0.02
prim_path: "/World/Env"
buildin: true
usd_path: "/Isaac/Environments/Grid/default_environment.usd" # /Isaac/Environments/Simple_Warehouse/full_warehouse.usd
synchronous_mode: false # if true, the simulator will wait for the control commands from the LCM channel for 0.1 seconds
robots:
- prim_path: "/World/Env/GO2"
usd_path: ""
name: "go2"
usd_file: "assets/usd/go2.usd"
position: [0.0, 0.0, 0.8]
orientation: [0.0, 0.0, 0.0, 1.0]
lidar: False
cameras:
# - prim_path: "/World/Env/GO2/Go2/base/base"
# name: "test_camera"
# type: ["rgb", "distance_to_camera"]
# resolution: [1280, 720]
# interstice_parameters:
# K: [1000, 1000, 500, 500]
# D: [0.1, 0.2, 0.1, 0.0]
# translation: [0.0, 0.0, 0.0] # x, y, z
# orientation: [0.0, 0.0, 0.0, 1.0] # qx, qy, qz, qw

View File

@ -1,119 +0,0 @@
import numpy as np
import omni.replicator.core as rep
from omni.isaac.core.prims import BaseSensor
from omni.isaac.core.prims.xform_prim import XFormPrim
from pxr import UsdGeom
from scipy.spatial.transform import Rotation as R
class AnnotatorManager:
def __init__(self, world):
self.annotators = {}
self.render_products = {}
self.cameras = {}
self.annotator_types = [
"rgb",
"distance_to_camera",
"pointcloud",
"semantic_segmentation",
]
self.camera_prims = {}
self.camera_info = {}
self.world = world
def registerCamera(
self, parent_prim, name, translation, orientation, resolution=(1024, 600)
):
"""
Register a camera in the scene. The camera will be attached to the parent_prim.
parent_prim: The prim path of the parent prim. The camera will be attached to this prim.
name: The name of the camera.
translation: The translation of the camera relative to the parent prim in the form of (x, y, z).
orientation: The orientation of the camera relative to the parent prim in the form of (x, y, z, w).
resolution: The resolution of the camera in the form of (width, height).
"""
qx, qy, qz, qw = tuple(orientation)
xformprim = XFormPrim(
prim_path=parent_prim + "/" + name,
name=name,
translation=tuple(translation),
orientation=(qw, qx, qy, qz), # Omniverse core convention (w, x, y, z)
)
self.camera_prims[name] = xformprim
self.cameras[name] = self.world.stage.DefinePrim(
xformprim.prim_path + "/" + name, "Camera"
)
UsdGeom.Xformable(self.cameras[name]).AddTranslateOp().Set((0.0, 0.0, 0.0))
# Rotate around x for 180 degrees to make the convention consistent with opencv/ROS
UsdGeom.Xformable(self.cameras[name]).AddRotateXYZOp().Set((180.0, 0.0, 0.0))
self.camera_info[name] = {
"translation": translation,
"orientation": orientation,
"resolution": resolution,
}
self.render_products[name] = rep.create.render_product(
str(self.cameras[name].GetPrimPath()), resolution
)
def registerAnnotator(self, type, camera_name):
assert (
type in self.annotator_types
), "Requested replicator type not available. Choose from: " + str(
self.annotator_types
)
assert (
camera_name in self.cameras.keys()
), "The requested camera is not registered. Please register the camera first."
self.annotators[camera_name + ":" + type] = rep.AnnotatorRegistry.get_annotator(
type
)
self.annotators[camera_name + ":" + type].attach(
[self.render_products[camera_name]]
)
def getData(self, annotator_name):
assert (
annotator_name in self.annotators.keys()
), "The requested annotator is not registered. Please register the annotator first."
return self.annotators[annotator_name].get_data()
def getStreams(self):
return list(self.annotators.keys())
def setFocalLength(self, name, value):
assert (
name in self.cameras.keys()
), "The requested camera is not registered. Please register the camera first."
self.cameras[name].GetAttribute("focalLength").Set(value)
def setClippingRange(self, name, min=0.2, max=1000000.0):
assert (
name in self.cameras.keys()
), "The requested camera is not registered. Please register the camera first."
self.cameras[name].GetAttribute("clippingRange").Set((min, max))
# TODO: implement these functions
def getCameraIntrinsics(self, name):
return None
def getCameraExtrinsics(self, name):
return None
def getCameraPose(self, name):
"""
Get the camera pose in the world frame
@param name: (str) The name of the camera
@return: (t, q) The camera pose (np.array([x, y, z]), np.array([qw, qx, qy, qz]))
"""
t, q = self.camera_prims[name].get_world_pose()
return t, q
def setCameraPose(self, name, t, q):
"""
Get the camera pose in the world frame
@param name: (str) The name of the camera
@param t: (mp.array([x, y, z])) The translation of the camera in the world frame
@param q: (mp.array([qw, qx, qy, qz])) The orientation of the camera in the world frame
@return: None
"""
self.camera_prims[name].set_local_pose(t, q)

View File

@ -1,11 +1,7 @@
import time
from copy import deepcopy
import matplotlib.pyplot as plt
import mujoco
import mujoco.viewer
import numpy as np
import pinocchio as pin
from pinocchio.robot_wrapper import RobotWrapper
from Go2Py import ASSETS_PATH
import os
from scipy.spatial.transform import Rotation

View File

@ -1,75 +0,0 @@
import select
import numpy as np
import yaml
from Go2Py.sim.isaacsim.lcm_types.unitree_lowlevel import UnitreeLowCommand, UnitreeLowState
def load_config(file_path):
with open(file_path, "r") as file:
config = yaml.safe_load(file)
return config
class NumpyMemMapDataPipe:
def __init__(self, channel_name, force=False, dtype="uint8", shape=(640, 480, 3)):
self.channel_name = channel_name
self.force = force
self.dtype = dtype
self.shape = shape
if force:
self.shm = np.memmap(
"/dev/shm/" + channel_name, dtype=self.dtype, mode="w+", shape=shape
)
else:
self.shm = np.memmap(
"/dev/shm/" + channel_name, dtype=self.dtype, mode="r+", shape=shape
)
def write(self, data, match_length=True):
if match_length:
self.shm[: data.shape[0], ...] = data
else:
assert (
data.shape == self.shape
), "The data and the shape of the shared memory must match"
self.shm[:] = data
def read(self):
return self.shm.copy()
class simulationManager:
def __init__(self, robot, lcm_server, default_cmd, physics_dt, lcm_timeout=0.01):
self.robot = robot
self.lcm_server = lcm_server
self.missed_ticks = 0
self.default_cmd = default_cmd
self.robot.initialize()
self.reset_required = True
self.lcm_timeout = lcm_timeout
self.physics_dt = physics_dt
self.robot.setCommands(self.default_cmd)
def step(self, timestamp):
# Read the robot's state and send it to the LCM client
state = self.robot.readStates()
state.stamp = timestamp
self.lcm_server.sendStates(state)
# Wait for a response from the LCM client timeout=0.1 second
lcm_cmd = self.lcm_server.getCommands(timeout=self.lcm_timeout)
if lcm_cmd is not None:
self.missed_ticks = 0
# Reset the robot if the communication has been off for too long
if self.reset_required:
self.robot.initialize()
self.robot.setCommands(self.default_cmd)
self.reset_required = False
self.robot.setCommands(lcm_cmd)
else:
self.missed_ticks += 1
if self.missed_ticks > 0.2 / (
self.lcm_timeout + self.physics_dt
): # stopped for more than a second?
self.reset_required = True
self.robot.setCommands(self.default_cmd)

View File

@ -1,4 +1,4 @@
isaac_ros:
docker_start:
@./scripts/run_dev.sh
messages:

View File

@ -17,5 +17,7 @@ rosdep update
# Restart udev daemon
sudo service udev restart
python3 -m pip install --upgrade pip
cd /workspaces/Go2Py/ && python3 -m pip install .
$@

View File

@ -20,4 +20,5 @@ install_requires =
pyyaml
pynput
pin
mujoco
mujoco
cyclonedds