diff --git a/deploy/deploy_real/common/np_math.py b/deploy/deploy_real/common/np_math.py index 610a4e9..0f1dc7c 100644 --- a/deploy/deploy_real/common/np_math.py +++ b/deploy/deploy_real/common/np_math.py @@ -3,6 +3,17 @@ import numpy as np import torch import torch as th +from contextlib import contextmanager +import os + +@contextmanager +def with_dir(d): + d0 = os.getcwd() + try: + os.chdir(d) + yield + finally: + os.chdir(d0) def axis_angle_from_quat(quat: np.ndarray, eps: float = 1.0e-6) -> np.ndarray: """Convert rotations given as quaternions to axis/angle. @@ -170,4 +181,4 @@ def xyzw2wxyz(q_xyzw: th.Tensor, dim: int = -1): def wxyz2xyzw(q_wxyz: th.Tensor, dim: int = -1): if isinstance(q_wxyz, np.ndarray): return np.roll(q_wxyz, -1, axis=dim) - return th.roll(q_wxyz, -1, dims=dim) \ No newline at end of file + return th.roll(q_wxyz, -1, dims=dim) diff --git a/deploy/deploy_real/deploy_real_ros_eetrack.py b/deploy/deploy_real/deploy_real_ros_eetrack.py index 262cde8..04cc9de 100644 --- a/deploy/deploy_real/deploy_real_ros_eetrack.py +++ b/deploy/deploy_real/deploy_real_ros_eetrack.py @@ -22,8 +22,6 @@ import pinocchio as pin from ikctrl import IKCtrl, xyzw2wxyz from yourdfpy import URDF - -from math_utils import * import math_utils import random as rd from act_to_dof import ActToDof @@ -37,140 +35,12 @@ class Mode(Enum): policy = 4 null = 5 - -def axis_angle_from_quat(quat: np.ndarray, eps: float = 1.0e-6) -> np.ndarray: - """Convert rotations given as quaternions to axis/angle. - - Args: - quat: The quaternion orientation in (w, x, y, z). Shape is (..., 4). - eps: The tolerance for Taylor approximation. Defaults to 1.0e-6. - - Returns: - Rotations given as a vector in axis angle form. Shape is (..., 3). - The vector's magnitude is the angle turned anti-clockwise in radians around the vector's direction. - - Reference: - https://github.com/facebookresearch/pytorch3d/blob/main/pytorch3d/transforms/rotation_conversions.py#L526-L554 - """ - # Modified to take in quat as [q_w, q_x, q_y, q_z] - # Quaternion is [q_w, q_x, q_y, q_z] = [cos(theta/2), n_x * sin(theta/2), n_y * sin(theta/2), n_z * sin(theta/2)] - # Axis-angle is [a_x, a_y, a_z] = [theta * n_x, theta * n_y, theta * n_z] - # Thus, axis-angle is [q_x, q_y, q_z] / (sin(theta/2) / theta) - # When theta = 0, (sin(theta/2) / theta) is undefined - # However, as theta --> 0, we can use the Taylor approximation 1/2 - - # theta^2 / 48 - quat = quat * (1.0 - 2.0 * (quat[..., 0:1] < 0.0)) - mag = np.linalg.norm(quat[..., 1:], axis=-1) - half_angle = np.arctan2(mag, quat[..., 0]) - angle = 2.0 * half_angle - # check whether to apply Taylor approximation - sin_half_angles_over_angles = np.where( - np.abs(angle) > eps, - np.sin(half_angle) / angle, - 0.5 - angle * angle / 48 - ) - return quat[..., 1:4] / sin_half_angles_over_angles[..., None] - -def quat_conjugate(q): - return np.concatenate( - (q[..., 0:1], -q[..., 1:]), - axis=-1) - -def quat_from_angle_axis( - angle: torch.Tensor, - axis: torch.Tensor = None) -> torch.Tensor: - """Convert rotations given as angle-axis to quaternions. - - Args: - angle: The angle turned anti-clockwise in radians around the vector's direction. Shape is (N,). - axis: The axis of rotation. Shape is (N, 3). - - Returns: - The quaternion in (w, x, y, z). Shape is (N, 4). - """ - if axis is None: - axa = angle - angle = torch.linalg.norm(axa, dim=-1) - axis = axa / angle[..., None].clamp_min(1e-6) - theta = (angle / 2).unsqueeze(-1) - xyz = normalize(axis) * theta.sin() - w = theta.cos() - return normalize(torch.cat([w, xyz], dim=-1)) - - -def quat_mul(q1: torch.Tensor, q2: torch.Tensor) -> torch.Tensor: - """Multiply two quaternions together. - - Args: - q1: The first quaternion in (w, x, y, z). Shape is (..., 4). - q2: The second quaternion in (w, x, y, z). Shape is (..., 4). - - Returns: - The product of the two quaternions in (w, x, y, z). Shape is (..., 4). - - Raises: - ValueError: Input shapes of ``q1`` and ``q2`` are not matching. - """ - # check input is correct - if q1.shape != q2.shape: - msg = f"Expected input quaternion shape mismatch: {q1.shape} != {q2.shape}." - raise ValueError(msg) - # reshape to (N, 4) for multiplication - shape = q1.shape - q1 = q1.reshape(-1, 4) - q2 = q2.reshape(-1, 4) - # extract components from quaternions - w1, x1, y1, z1 = q1[:, 0], q1[:, 1], q1[:, 2], q1[:, 3] - w2, x2, y2, z2 = q2[:, 0], q2[:, 1], q2[:, 2], q2[:, 3] - # perform multiplication - ww = (z1 + x1) * (x2 + y2) - yy = (w1 - y1) * (w2 + z2) - zz = (w1 + y1) * (w2 - z2) - xx = ww + yy + zz - qq = 0.5 * (xx + (z1 - x1) * (x2 - y2)) - w = qq - ww + (z1 - y1) * (y2 - z2) - x = qq - xx + (x1 + w1) * (x2 + w2) - y = qq - yy + (w1 - x1) * (y2 + z2) - z = qq - zz + (z1 + y1) * (w2 - x2) - - return torch.stack([w, x, y, z], dim=-1).view(shape) - - -def quat_rotate(q: np.ndarray, v: np.ndarray) -> np.ndarray: - """Rotate a vector by a quaternion along the last dimension of q and v. - - Args: - q: The quaternion in (w, x, y, z). Shape is (..., 4). - v: The vector in (x, y, z). Shape is (..., 3). - - Returns: - The rotated vector in (x, y, z). Shape is (..., 3). - """ - q_w = q[..., 0] - q_vec = q[..., 1:] - a = v * (2.0 * q_w**2 - 1.0)[..., None] - b = np.cross(q_vec, v, axis=-1) * q_w[..., None] * 2.0 - c = q_vec * np.einsum("...i,...i->...", q_vec, v)[..., None] * 2.0 - return a + b + c - - -def quat_rotate_inverse(q: np.ndarray, v: np.ndarray) -> np.ndarray: - """Rotate a vector by the inverse of a quaternion along the last dimension of q and v. - - Args: - q: The quaternion in (w, x, y, z). Shape is (..., 4). - v: The vector in (x, y, z). Shape is (..., 3). - - Returns: - The rotated vector in (x, y, z). Shape is (..., 3). - """ - q_w = q[..., 0] - q_vec = q[..., 1:] - a = v * (2.0 * q_w**2 - 1.0)[..., None] - b = np.cross(q_vec, v, axis=-1) * q_w[..., None] * 2.0 - c = q_vec * np.einsum("...i,...i->...", q_vec, v)[..., None] * 2.0 - return a - b + c - +axis_angle_from_quat = math_utils.as_np(math_utils.axis_angle_from_quat) +quat_conjugate = math_utils.as_np(math_utils.quat_conjugate) +quat_mul = math_utils.as_np(math_utils.quat_mul) +quat_rotate = math_utils.as_np(math_utils.quat_rotate) +quat_rotate_inverse = math_utils.as_np(math_utils.quat_rotate_inverse) +wrap_to_pi = math_utils.as_np(math_utils.wrap_to_pi) def body_pose( tf_buffer, @@ -200,7 +70,7 @@ def body_pose( xyz = np.array(xyz) if rot_type == 'axa': axa = axis_angle_from_quat(quat_wxyz) - axa = (axa + np.pi) % (2 * np.pi) - np.pi + axa = wrap_to_pi(axa) return (xyz, axa) elif rot_type == 'quat': return (xyz, quat_wxyz) @@ -281,7 +151,8 @@ class eetrack: self.create_eetrack() self.eetrack_subgoal = self.create_subgoal() self.sg_idx = 0 - self.init_time = rp.time.Time().nanoseconds/1e9 + 1.0 # first subgoal sampling time = 1.0s + # first subgoal sampling time = 1.0s + self.init_time = rp.time.Time().nanoseconds / 1e9 + 1.0 def create_eetrack(self): self.eetrack_start = self.eetrack_midpt.clone() @@ -337,13 +208,13 @@ class eetrack: return torch.cat([eetrack_subgoals, eetrack_ori], dim=2) def update_command(self): - time = self.init_time - rp.time.Time().nanoseconds/1e9 + time = self.init_time - rp.time.Time().nanoseconds / 1e9 if (time >= 0): - self.sg_idx = int( time / 0.1 + 1 ) + self.sg_idx = int(time / 0.1 + 1) # self.sg_idx.clamp_(0, self.number_of_subgoals + 1) - self.sg_idx = min(self.sg_idx, self.number_of_subgoals+1) + self.sg_idx = min(self.sg_idx, self.number_of_subgoals + 1) self.next_command_s_left = self.eetrack_subgoal[..., - self.sg_idx, :] + self.sg_idx, :] def get_command(self, root_state_w): self.update_command() @@ -491,7 +362,7 @@ class Observation: hand_pose, projected_com, joint_pos, - 0.0*joint_vel, + 0.0 * joint_vel, actions, hands_command, right_arm_com, @@ -537,7 +408,7 @@ class Controller: self.lab_from_mot = index_map(self.config.lab_joint, self.config.motor_joint) self.config.default_angles = np.asarray(self.config.lab_joint_offsets)[ - self.lab_from_mot + self.lab_from_mot ] # Data buffers @@ -562,7 +433,9 @@ class Controller: q_pin[self.pin_from_mot] = q_mot default_pose = self.ikctrl.fk(q_pin) xyz = default_pose.translation - quat_wxyz = xyzw2wxyz(pin.Quaternion(default_pose.rotation).coeffs()) + quat_wxyz = xyzw2wxyz( + pin.Quaternion( + default_pose.rotation).coeffs()) self.default_pose = np.concatenate([xyz, quat_wxyz]) self.target_pose = np.copy(self.default_pose) print('default_pose', self.default_pose) @@ -670,7 +543,7 @@ class Controller: # move to default pos if self.counter < self._num_step: alpha = self.counter / self._num_step - #for j in range(self._dof_size): + # for j in range(self._dof_size): for j in range(29): # motor_idx = self._dof_idx[j] # target_pos = self._default_pos[j] @@ -751,37 +624,37 @@ class Controller: if self.eetrack is None: self.eetrack = eetrack(torch.from_numpy(root_state_w)[None], - self.tf_buffer) + self.tf_buffer) if False: _hands_command_ = self.eetrack.get_command( - torch.from_numpy(root_state_w)[None])[0].detach().cpu().numpy() + torch.from_numpy(root_state_w)[None])[0].detach().cpu().numpy() else: _hands_command_ = np.zeros(6) - #_hands_command_[0] = self.cmd[0] * 0.03 - #_hands_command_[2] = self.cmd[1] * 0.03 + # _hands_command_[0] = self.cmd[0] * 0.03 + # _hands_command_[2] = self.cmd[1] * 0.03 if True: - q_mot = [self.low_state.motor_state[i_mot].q for i_mot in range(29)] + q_mot = [self.low_state.motor_state[i_mot].q + for i_mot in range(29)] # print('q_mot (out)', q_mot) q_pin = np.zeros_like(self.ikctrl.cfg.q) q_pin[self.pin_from_mot] = q_mot current_pose = self.ikctrl.fk(q_pin) - _hands_command_ = np.zeros(6) + _hands_command_ = np.zeros(6) _hands_command_[0:3] = (self.target_pose[:3] - - current_pose.translation) + - current_pose.translation) quat_wxyz = xyzw2wxyz(pin.Quaternion( current_pose.rotation).coeffs()) # q_target @ q_current^{-1} d_quat = quat_mul( - torch.from_numpy(self.target_pose[3:7]), - torch.from_numpy(quat_conjugate(quat_wxyz)) + torch.from_numpy(self.target_pose[3:7]), + torch.from_numpy(quat_conjugate(quat_wxyz)) ).detach().cpu().numpy() d_axa = axis_angle_from_quat(d_quat) _hands_command_[3:6] = d_axa # bprint('hands_command', _hands_command_) - # print(_hands_command_) # _hands_command_ = np.zeros(6) @@ -801,23 +674,24 @@ class Controller: obs_tensor = obs_tensor.detach().clone() # hands_command = obs[..., 119:125] + if False: world_from_body_quat = math_utils.quat_from_euler_xyz( - th.as_tensor([0], dtype=th.float32), - th.as_tensor([0], dtype=th.float32), - th.as_tensor([1.57], dtype=th.float32)).reshape(4) + th.as_tensor([0], dtype=th.float32), + th.as_tensor([0], dtype=th.float32), + th.as_tensor([1.57], dtype=th.float32)).reshape(4) obs_tensor[..., 119:122] = math_utils.quat_rotate( - world_from_body_quat[None], - obs_tensor[..., 119:122]) + world_from_body_quat[None], + obs_tensor[..., 119:122]) obs_tensor[..., 122:125] = math_utils.quat_rotate( - world_from_body_quat[None], - obs_tensor[..., 122:125]) + world_from_body_quat[None], + obs_tensor[..., 122:125]) self.action = self.policy(obs_tensor).detach().numpy().squeeze() # np.save(F'{logpath}/act{self.counter:03d}.npy', # self.action) target_dof_pos = self.actmap( - self.obs, - self.action + self.obs, + self.action ) # print('??', # target_dof_pos, diff --git a/deploy/deploy_real/fake_world_tf_pub.py b/deploy/deploy_real/fake_world_tf_pub.py new file mode 100755 index 0000000..7484022 --- /dev/null +++ b/deploy/deploy_real/fake_world_tf_pub.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 + +from pathlib import Path + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile +from unitree_hg.msg import LowState as LowStateHG +from tf2_ros import TransformBroadcaster, TransformStamped + +import numpy as np +import pinocchio as pin +import pink +from common.np_math import (index_map, with_dir) +from math_utils import (as_np, quat_rotate) + +quat_rotate = as_np(quat_rotate) + + +class FakeWorldPublisher(Node): + def __init__(self): + super().__init__('fake_world_publisher') + + urdf_path = '../../resources/robots/g1_description/g1_29dof_with_hand_rev_1_0.urdf' + path = Path(urdf_path) + with with_dir(path.parent): + robot = pin.RobotWrapper.BuildFromURDF(filename=path.name, + package_dirs=["."], + root_joint=None) + self.robot = robot + + self.low_state = LowStateHG() + self.low_state_subscriber = self.create_subscription( + LowStateHG, + 'lowstate', + self.on_low_state, + 10) + self.tf_broadcaster = TransformBroadcaster(self) + + def on_low_state(self, + msg: LowStateHG): + self.low_state = msg + + t = TransformStamped() + + # Read message content and assign it to + # corresponding tf variables + t.header.stamp = self.get_clock().now().to_msg() + t.header.frame_id = 'world' + t.child_frame_id = 'pelvis' + + # Turtle only exists in 2D, thus we get x and y translation + # coordinates from the message and set the z coordinate to 0 + t.transform.translation.x = 0.0 + t.transform.translation.y = 0.0 + t.transform.translation.z = self.pelvis_height(self.low_state) + + # Set world_from_pelvis quaternion based on IMU state + # TODO(ycho): consider applying 90-deg offset? + qw, qx, qy, qz = self.low_state.imu_state.quaternion + t.transform.rotation.x = qx + t.transform.rotation.y = qy + t.transform.rotation.z = qz + t.transform.rotation.w = qw + + # Send the transformation + self.tf_broadcaster.sendTransform(t) + + def pelvis_height(self, low_state: LowStateHG): + robot = self.robot + q_mot = [self.low_state.motor_state[i_mot].q for i_mot in range(29)] + q_pin = np.zeros_like(self.ikctrl.cfg.q) + q_pin[self.pin_from_mot] = q_mot + cfg = pink.Configuration(robot.model, robot.data, q_pin) + + pelvis_from_rf = cfg.get_transform_frame_to_world( + 'right_ankle_roll_link') + pelvis_from_lf = cfg.get_transform_frame_to_world( + 'left_ankle_roll_link') + + # 0.02 = "roll_link" height (approx) + world_from_pelvis_quat = low_state.imu_state.quaternion + pelvis_z_rf = -quat_rotate( + world_from_pelvis_quat, + pelvis_from_rf.translation)[2] + 0.02 + pelvis_z_lf = -quat_rotate( + world_from_pelvis_quat, + pelvis_from_lf.translation)[2] + 0.02 + return 0.5 * pelvis_z_lf + 0.5 * pelvis_z_rf + + +def main(): + rclpy.init() + node = FakeWorldPublisher() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + rclpy.shutdown() diff --git a/deploy/deploy_real/math_utils.py b/deploy/deploy_real/math_utils.py index 23ab9dd..fea005f 100644 --- a/deploy/deploy_real/math_utils.py +++ b/deploy/deploy_real/math_utils.py @@ -11,12 +11,54 @@ from __future__ import annotations import numpy as np import torch import torch.nn.functional -from typing import Literal +import torch as th +from typing import Literal, Mapping, Iterable +from functools import wraps """ General """ +def th2np(x): + if isinstance(x, np.ndarray): + return x + + if isinstance(x, th.Tensor): + return x.numpy() + + if isinstance(x, Mapping): + return {k: th2np(v) for (k,v) in x.items()} + + if isinstance(x, Iterable): + return [th2np(e) for e in x] + return x + +def np2th(x): + if isinstance(x, th.Tensor): + return x + + if isinstance(x, np.ndarray): + return th.from_numpy(x) + + if isinstance(x, Mapping): + return {k: np2th(v) for (k,v) in x.items()} + + if isinstance(x, Iterable): + return [np2th(e) for e in x] + + return x + +def as_np(func): + @wraps(func) + def np_func(*args, **kwds): + th_args = np2th(args) + th_kwds = np2th(kwds) + th_out = func(*th_args, **th_kwds) + # TODO(ycho): consider interoperable functions, + # i.e., numpy in -> numpy out; torch in -> torch out + np_out = th2np(th_out) + return np_out + return np_func @torch.jit.script def scale_transform( @@ -1566,3 +1608,16 @@ def slerp(q0: torch.Tensor, q1: torch.Tensor, steps: torch.Tensor): diff_quat = safe_rotvec2quat(diff) return quat_mul(q0, diff_quat) + +def main(): + quat = np.random.normal(size=4) + quat /= np.linalg.norm(quat) + axa = as_np(axis_angle_from_quat)(quat) + print(axa) + + axa_th = axis_angle_from_quat(th.as_tensor(quat, + device='cpu')) + print(axa_th.detach().cpu().numpy()) + +if __name__ == '__main__': + main() diff --git a/deploy/deploy_real/state_pub.py b/deploy/deploy_real/state_pub.py index c966985..253189c 100644 --- a/deploy/deploy_real/state_pub.py +++ b/deploy/deploy_real/state_pub.py @@ -26,12 +26,6 @@ class StatePublisher(Node): self.get_logger().info("{0} started".format(self.nodeName)) self.joint_state = JointState() - # def wait_for_low_state(self): - # while self.low_state.crc == 0: - # print(self.low_state) - # time.sleep(self.config.control_dt) - # print("Successfully connected to the robot.") - def on_low_state(self, msg: LowStateHG): self.low_state = msg joint_state = self.joint_state @@ -40,11 +34,10 @@ class StatePublisher(Node): joint_state.name = self.joint_names joint_state.position = [0.0 for _ in self.joint_names] - # print('a', len(self.joint_names)) # 29 - # print('b', len(self.low_state.motor_state)) # 35?? n:int = min(len(self.joint_names), len(self.low_state.motor_state)) for i in range(n): joint_state.position[i] = self.low_state.motor_state[i].q + joint_state.velocity[i] = self.low_state.motor_state[i].dq self.joint_pub.publish(joint_state) def run(self): @@ -67,4 +60,4 @@ def main(): rclpy.shutdown() if __name__ == '__main__': - main() \ No newline at end of file + main()