Go2Py/examples/dds_draft.ipynb

378 lines
11 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from cyclonedds.domain import DomainParticipant\n",
"from Go2Py.unitree_go.msg.dds_ import Go2pyLowCmd_\n",
"from cyclonedds.topic import Topic\n",
"from cyclonedds.pub import DataWriter"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"participant = DomainParticipant()\n",
"topic = Topic(participant, \"rt/go2/lowcmd\", Go2pyLowCmd_)\n",
"writer = DataWriter(participant, topic)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"msg = Go2pyLowCmd_(np.zeros(12),\n",
"np.zeros(12),np.zeros(12),np.zeros(12),np.zeros(12))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"while True:\n",
" writer.write(msg)\n",
" time.sleep(0.001)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from Go2Py.std_msgs.msg.dds_ import String_\n",
"participant = DomainParticipant()\n",
"topic = Topic(participant, \"rt/test\", String_)\n",
"writer = DataWriter(participant, topic)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"msg = String_(\"Hello\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"while True:\n",
" writer.write(msg)\n",
" time.sleep(0.001)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from cyclonedds.domain import DomainParticipant\n",
"from cyclonedds.topic import Topic\n",
"from cyclonedds.sub import DataReader\n",
"from cyclonedds.util import duration\n",
"from Go2Py.unitree_go.msg.dds_ import LowState_"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"participant = DomainParticipant()\n",
"topic = Topic(participant, 'rt/lowstate', LowState_)\n",
"reader = DataReader(participant, topic)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"for msg in reader.take_iter(timeout=duration(milliseconds=100.)):\n",
" print(msg)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"reader.take(timeout=duration(milliseconds=100.))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import struct\n",
"import threading\n",
"import time\n",
"import numpy as np\n",
"import numpy.linalg as LA\n",
"from scipy.spatial.transform import Rotation as R\n",
"\n",
"from cyclonedds.domain import DomainParticipant\n",
"from Go2Py.unitree_go.msg.dds_ import Go2pyLowCmd_\n",
"from cyclonedds.topic import Topic\n",
"from cyclonedds.pub import DataWriter\n",
"\n",
"from cyclonedds.domain import DomainParticipant\n",
"from cyclonedds.topic import Topic\n",
"from cyclonedds.sub import DataReader\n",
"from cyclonedds.util import duration\n",
"from Go2Py.unitree_go.msg.dds_ import LowState_\n",
"from threading import Thread"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class GO2Real():\n",
" def __init__(\n",
" self,\n",
" mode = 'lowlevel', # 'highlevel' or 'lowlevel'\n",
" vx_max=0.5,\n",
" vy_max=0.4,\n",
" ωz_max=0.5,\n",
" ):\n",
" assert mode in ['highlevel', 'lowlevel'], \"mode should be either 'highlevel' or 'lowlevel'\"\n",
" self.mode = mode\n",
" if self.mode == 'highlevel':\n",
" raise NotImplementedError('DDS interface for the highlevel commands is not implemented yet. Please use our ROS2 interface.')\n",
"\n",
" self.highcmd_topic_name = \"rt/go2/twist_cmd\"\n",
" self.lowcmd_topic_name = \"rt/go2/lowcmd\"\n",
" self.lowstate_topic_name = \"rt/lowstate\"\n",
"\n",
" self.participant = DomainParticipant()\n",
" self.lowstate_topic = Topic(self.participant, self.lowstate_topic_name, LowState_)\n",
" self.lowstate_reader = DataReader(self.participant, self.lowstate_topic)\n",
" \n",
" self.lowcmd_topic = Topic(self.participant, self.lowcmd_topic_name, Go2pyLowCmd_)\n",
" self.lowcmd_writer = DataWriter(self.participant, self.lowcmd_topic)\n",
"\n",
" self.state = None\n",
" self.setCommands = {'lowlevel':self.setCommandsLow}[self.mode]\n",
" self.lowstate_thread = Thread(target = self.lowstate_update)\n",
" self.running = True\n",
" self.lowstate_thread.start()\n",
"\n",
" def lowstate_update(self):\n",
" \"\"\"\n",
" Retrieve the state of the robot\n",
" \"\"\"\n",
" while self.running:\n",
" for msg in self.lowstate_reader.take_iter(timeout=duration(milliseconds=100.)):\n",
" self.state = msg\n",
"\n",
" def getIMU(self):\n",
" accel = self.state.imu_state.accelerometer\n",
" gyro = self.state.imu_state.gyroscope\n",
" quat = self.state.imu_state.quaternion\n",
" rpy = self.state.imu_state.rpy\n",
" temp = self.state.imu_state.temperature\n",
" return {'accel':accel, 'gyro':gyro, 'quat':quat, \"rpy\":rpy, 'temp':temp}\n",
"\n",
" def getFootContacts(self):\n",
" \"\"\"Returns the raw foot contact forces\"\"\"\n",
" footContacts = self.state.foot_force \n",
" return np.array(footContacts)\n",
"\n",
" def getJointStates(self):\n",
" \"\"\"Returns the joint angles (q) and velocities (dq) of the robot\"\"\"\n",
" motor_state = np.array([[self.state.motor_state[i].q,\n",
" robot.state.motor_state[i].dq,\n",
" robot.state.motor_state[i].ddq,\n",
" robot.state.motor_state[i].tau_est,\n",
" robot.state.motor_state[i].temperature] for i in range(12)])\n",
" return {'q':motor_state[:,0], \n",
" 'dq':motor_state[:,1],\n",
" 'ddq':motor_state[:,2],\n",
" 'tau_est':motor_state[:,3],\n",
" 'temperature':motor_state[:,4]}\n",
"\n",
" def getRemoteState(self):\n",
" \"\"\"A method to get the state of the wireless remote control. \n",
" Returns a xRockerBtn object: \n",
" - head: [head1, head2]\n",
" - keySwitch: xKeySwitch object\n",
" - lx: float\n",
" - rx: float\n",
" - ry: float\n",
" - L2: float\n",
" - ly: float\n",
" \"\"\"\n",
" wirelessRemote = self.state.wireless_remote[:24]\n",
" binary_data = bytes(wirelessRemote)\n",
"\n",
" format_str = \"<2BH5f\"\n",
" data = struct.unpack(format_str, binary_data)\n",
"\n",
" head = list(data[:2])\n",
" lx = data[3]\n",
" rx = data[4]\n",
" ry = data[5]\n",
" L2 = data[6]\n",
" ly = data[7]\n",
"\n",
" _btn = bin(data[2])[2:].zfill(16)\n",
" btn = [int(char) for char in _btn]\n",
" btn.reverse()\n",
"\n",
" keySwitch = xKeySwitch(*btn)\n",
" rockerBtn = xRockerBtn(head, keySwitch, lx, rx, ry, L2, ly)\n",
" return rockerBtn\n",
"\n",
" def getCommandFromRemote(self):\n",
" \"\"\"Do not use directly for control!!!\"\"\"\n",
" rockerBtn = self.getRemoteState()\n",
"\n",
" lx = rockerBtn.lx\n",
" ly = rockerBtn.ly\n",
" rx = rockerBtn.rx\n",
"\n",
" v_x = ly * self.vx_max\n",
" v_y = lx * self.vy_max\n",
" ω = rx * self.ωz_max\n",
" \n",
" return v_x, v_y, ω\n",
"\n",
" def getBatteryState(self):\n",
" \"\"\"Returns the battery percentage of the robot\"\"\"\n",
" batteryState = self.state.bms_state\n",
" return batteryState.soc\n",
"\n",
" def setCommandsHigh(self, v_x, v_y, ω_z, bodyHeight=0.0, footRaiseHeight=0.0, mode=2):\n",
" self.cmd_watchdog_timer = time.time()\n",
" _v_x, _v_y, _ω_z = self.clip_velocity(v_x, v_y, ω_z)\n",
" self.highcmd.header.stamp = self.get_clock().now().to_msg()\n",
" self.highcmd.header.frame_id = \"base_link\"\n",
" self.highcmd.twist.linear.x = _v_x\n",
" self.highcmd.twist.linear.y = _v_y\n",
" self.highcmd.twist.angular.z = _ω_z\n",
" self.highcmd_publisher.publish(self.highcmd)\n",
"\n",
" def setCommandsLow(self, q, dq, kp, kd, tau_ff):\n",
" assert q.size == dq.size == kp.size == kd.size == tau_ff.size == 12, \"q, dq, kp, kd, tau_ff should have size 12\"\n",
" lowcmd = Go2pyLowCmd_(\n",
" q,\n",
" dq, \n",
" kp,\n",
" kd,\n",
" tau_ff\n",
" )\n",
" self.lowcmd_writer.write(lowcmd)\n",
"\n",
" def close(self):\n",
" self.running = False\n",
"\n",
" def check_calf_collision(self, q):\n",
" self.pin_robot.update(q)\n",
" in_collision = self.pin_robot.check_calf_collision(q)\n",
" return in_collision\n",
"\n",
" def clip_velocity(self, v_x, v_y, ω_z):\n",
" _v = np.array([[v_x], [v_y]])\n",
" _scale = np.sqrt(_v.T @ self.P_v_max @ _v)[0, 0]\n",
"\n",
" if _scale > 1.0:\n",
" scale = 1.0 / _scale\n",
" else:\n",
" scale = 1.0\n",
"\n",
" return scale * v_x, scale * v_y, np.clip(ω_z, self.ωz_min, self.ωz_max)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"robot.getBatteryState()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from Go2Py.robot.interface.dds import GO2Real\n",
"robot = GO2Real()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"robot.getIMU()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.10"
}
},
"nbformat": 4,
"nbformat_minor": 2
}