2024-03-11 06:27:42 +08:00
|
|
|
{
|
|
|
|
"cells": [
|
|
|
|
{
|
|
|
|
"cell_type": "code",
|
|
|
|
"execution_count": null,
|
|
|
|
"metadata": {},
|
|
|
|
"outputs": [],
|
|
|
|
"source": [
|
|
|
|
"from cyclonedds.domain import DomainParticipant\n",
|
|
|
|
"from Go2Py.unitree_go.msg.dds_ import Go2pyLowCmd_\n",
|
|
|
|
"from cyclonedds.topic import Topic\n",
|
|
|
|
"from cyclonedds.pub import DataWriter"
|
|
|
|
]
|
|
|
|
},
|
|
|
|
{
|
|
|
|
"cell_type": "code",
|
|
|
|
"execution_count": null,
|
|
|
|
"metadata": {},
|
|
|
|
"outputs": [],
|
|
|
|
"source": [
|
|
|
|
"participant = DomainParticipant()\n",
|
|
|
|
"topic = Topic(participant, \"rt/go2/lowcmd\", Go2pyLowCmd_)\n",
|
|
|
|
"writer = DataWriter(participant, topic)"
|
|
|
|
]
|
|
|
|
},
|
|
|
|
{
|
|
|
|
"cell_type": "code",
|
|
|
|
"execution_count": null,
|
|
|
|
"metadata": {},
|
|
|
|
"outputs": [],
|
|
|
|
"source": [
|
|
|
|
"import numpy as np\n",
|
|
|
|
"msg = Go2pyLowCmd_(np.zeros(12),\n",
|
|
|
|
"np.zeros(12),np.zeros(12),np.zeros(12),np.zeros(12))"
|
|
|
|
]
|
|
|
|
},
|
|
|
|
{
|
|
|
|
"cell_type": "code",
|
|
|
|
"execution_count": null,
|
|
|
|
"metadata": {},
|
|
|
|
"outputs": [],
|
|
|
|
"source": [
|
|
|
|
"import time\n",
|
|
|
|
"while True:\n",
|
|
|
|
" writer.write(msg)\n",
|
|
|
|
" time.sleep(0.001)"
|
|
|
|
]
|
|
|
|
},
|
|
|
|
{
|
|
|
|
"cell_type": "code",
|
|
|
|
"execution_count": null,
|
|
|
|
"metadata": {},
|
|
|
|
"outputs": [],
|
|
|
|
"source": [
|
|
|
|
"from Go2Py.std_msgs.msg.dds_ import String_\n",
|
|
|
|
"participant = DomainParticipant()\n",
|
|
|
|
"topic = Topic(participant, \"rt/test\", String_)\n",
|
|
|
|
"writer = DataWriter(participant, topic)"
|
|
|
|
]
|
|
|
|
},
|
|
|
|
{
|
|
|
|
"cell_type": "code",
|
|
|
|
"execution_count": null,
|
|
|
|
"metadata": {},
|
|
|
|
"outputs": [],
|
|
|
|
"source": [
|
|
|
|
"msg = String_(\"Hello\")"
|
|
|
|
]
|
|
|
|
},
|
|
|
|
{
|
|
|
|
"cell_type": "code",
|
|
|
|
"execution_count": null,
|
|
|
|
"metadata": {},
|
|
|
|
"outputs": [],
|
|
|
|
"source": [
|
|
|
|
"import time\n",
|
|
|
|
"while True:\n",
|
|
|
|
" writer.write(msg)\n",
|
|
|
|
" time.sleep(0.001)"
|
|
|
|
]
|
|
|
|
},
|
|
|
|
{
|
|
|
|
"cell_type": "code",
|
|
|
|
"execution_count": null,
|
|
|
|
"metadata": {},
|
|
|
|
"outputs": [],
|
|
|
|
"source": [
|
|
|
|
"from cyclonedds.domain import DomainParticipant\n",
|
|
|
|
"from cyclonedds.topic import Topic\n",
|
|
|
|
"from cyclonedds.sub import DataReader\n",
|
|
|
|
"from cyclonedds.util import duration\n",
|
|
|
|
"from Go2Py.unitree_go.msg.dds_ import LowState_"
|
|
|
|
]
|
|
|
|
},
|
|
|
|
{
|
|
|
|
"cell_type": "code",
|
|
|
|
"execution_count": null,
|
|
|
|
"metadata": {},
|
|
|
|
"outputs": [],
|
|
|
|
"source": [
|
|
|
|
"participant = DomainParticipant()\n",
|
|
|
|
"topic = Topic(participant, 'rt/lowstate', LowState_)\n",
|
|
|
|
"reader = DataReader(participant, topic)"
|
|
|
|
]
|
|
|
|
},
|
|
|
|
{
|
|
|
|
"cell_type": "code",
|
|
|
|
"execution_count": null,
|
|
|
|
"metadata": {},
|
|
|
|
"outputs": [],
|
|
|
|
"source": [
|
|
|
|
"for msg in reader.take_iter(timeout=duration(milliseconds=100.)):\n",
|
|
|
|
" print(msg)"
|
|
|
|
]
|
|
|
|
},
|
|
|
|
{
|
|
|
|
"cell_type": "code",
|
|
|
|
"execution_count": null,
|
|
|
|
"metadata": {},
|
|
|
|
"outputs": [],
|
|
|
|
"source": [
|
|
|
|
"reader.take(timeout=duration(milliseconds=100.))"
|
|
|
|
]
|
|
|
|
},
|
|
|
|
{
|
|
|
|
"cell_type": "code",
|
2024-03-11 06:34:53 +08:00
|
|
|
"execution_count": null,
|
2024-03-11 06:27:42 +08:00
|
|
|
"metadata": {},
|
|
|
|
"outputs": [],
|
|
|
|
"source": [
|
|
|
|
"import struct\n",
|
|
|
|
"import threading\n",
|
|
|
|
"import time\n",
|
|
|
|
"import numpy as np\n",
|
|
|
|
"import numpy.linalg as LA\n",
|
|
|
|
"from scipy.spatial.transform import Rotation as R\n",
|
|
|
|
"\n",
|
|
|
|
"from cyclonedds.domain import DomainParticipant\n",
|
|
|
|
"from Go2Py.unitree_go.msg.dds_ import Go2pyLowCmd_\n",
|
|
|
|
"from cyclonedds.topic import Topic\n",
|
|
|
|
"from cyclonedds.pub import DataWriter\n",
|
|
|
|
"\n",
|
|
|
|
"from cyclonedds.domain import DomainParticipant\n",
|
|
|
|
"from cyclonedds.topic import Topic\n",
|
|
|
|
"from cyclonedds.sub import DataReader\n",
|
|
|
|
"from cyclonedds.util import duration\n",
|
|
|
|
"from Go2Py.unitree_go.msg.dds_ import LowState_\n",
|
|
|
|
"from threading import Thread"
|
|
|
|
]
|
|
|
|
},
|
|
|
|
{
|
|
|
|
"cell_type": "code",
|
2024-03-11 06:34:53 +08:00
|
|
|
"execution_count": null,
|
2024-03-11 06:27:42 +08:00
|
|
|
"metadata": {},
|
|
|
|
"outputs": [],
|
|
|
|
"source": [
|
|
|
|
"class GO2Real():\n",
|
|
|
|
" def __init__(\n",
|
|
|
|
" self,\n",
|
|
|
|
" mode = 'lowlevel', # 'highlevel' or 'lowlevel'\n",
|
|
|
|
" vx_max=0.5,\n",
|
|
|
|
" vy_max=0.4,\n",
|
|
|
|
" ωz_max=0.5,\n",
|
|
|
|
" ):\n",
|
|
|
|
" assert mode in ['highlevel', 'lowlevel'], \"mode should be either 'highlevel' or 'lowlevel'\"\n",
|
|
|
|
" self.mode = mode\n",
|
|
|
|
" if self.mode == 'highlevel':\n",
|
|
|
|
" raise NotImplementedError('DDS interface for the highlevel commands is not implemented yet. Please use our ROS2 interface.')\n",
|
|
|
|
"\n",
|
|
|
|
" self.highcmd_topic_name = \"rt/go2/twist_cmd\"\n",
|
|
|
|
" self.lowcmd_topic_name = \"rt/go2/lowcmd\"\n",
|
|
|
|
" self.lowstate_topic_name = \"rt/lowstate\"\n",
|
|
|
|
"\n",
|
|
|
|
" self.participant = DomainParticipant()\n",
|
|
|
|
" self.lowstate_topic = Topic(self.participant, self.lowstate_topic_name, LowState_)\n",
|
|
|
|
" self.lowstate_reader = DataReader(self.participant, self.lowstate_topic)\n",
|
|
|
|
" \n",
|
|
|
|
" self.lowcmd_topic = Topic(self.participant, self.lowcmd_topic_name, Go2pyLowCmd_)\n",
|
|
|
|
" self.lowcmd_writer = DataWriter(self.participant, self.lowcmd_topic)\n",
|
|
|
|
"\n",
|
|
|
|
" self.state = None\n",
|
|
|
|
" self.setCommands = {'lowlevel':self.setCommandsLow}[self.mode]\n",
|
|
|
|
" self.lowstate_thread = Thread(target = self.lowstate_update)\n",
|
|
|
|
" self.running = True\n",
|
|
|
|
" self.lowstate_thread.start()\n",
|
|
|
|
"\n",
|
|
|
|
" def lowstate_update(self):\n",
|
|
|
|
" \"\"\"\n",
|
|
|
|
" Retrieve the state of the robot\n",
|
|
|
|
" \"\"\"\n",
|
|
|
|
" while self.running:\n",
|
|
|
|
" for msg in self.lowstate_reader.take_iter(timeout=duration(milliseconds=100.)):\n",
|
|
|
|
" self.state = msg\n",
|
|
|
|
"\n",
|
|
|
|
" def getIMU(self):\n",
|
|
|
|
" accel = self.state.imu_state.accelerometer\n",
|
|
|
|
" gyro = self.state.imu_state.gyroscope\n",
|
|
|
|
" quat = self.state.imu_state.quaternion\n",
|
|
|
|
" rpy = self.state.imu_state.rpy\n",
|
|
|
|
" temp = self.state.imu_state.temperature\n",
|
|
|
|
" return {'accel':accel, 'gyro':gyro, 'quat':quat, \"rpy\":rpy, 'temp':temp}\n",
|
|
|
|
"\n",
|
|
|
|
" def getFootContacts(self):\n",
|
|
|
|
" \"\"\"Returns the raw foot contact forces\"\"\"\n",
|
|
|
|
" footContacts = self.state.foot_force \n",
|
|
|
|
" return np.array(footContacts)\n",
|
|
|
|
"\n",
|
|
|
|
" def getJointStates(self):\n",
|
|
|
|
" \"\"\"Returns the joint angles (q) and velocities (dq) of the robot\"\"\"\n",
|
|
|
|
" motor_state = np.array([[self.state.motor_state[i].q,\n",
|
|
|
|
" robot.state.motor_state[i].dq,\n",
|
|
|
|
" robot.state.motor_state[i].ddq,\n",
|
|
|
|
" robot.state.motor_state[i].tau_est,\n",
|
|
|
|
" robot.state.motor_state[i].temperature] for i in range(12)])\n",
|
|
|
|
" return {'q':motor_state[:,0], \n",
|
|
|
|
" 'dq':motor_state[:,1],\n",
|
|
|
|
" 'ddq':motor_state[:,2],\n",
|
|
|
|
" 'tau_est':motor_state[:,3],\n",
|
|
|
|
" 'temperature':motor_state[:,4]}\n",
|
|
|
|
"\n",
|
|
|
|
" def getRemoteState(self):\n",
|
|
|
|
" \"\"\"A method to get the state of the wireless remote control. \n",
|
|
|
|
" Returns a xRockerBtn object: \n",
|
|
|
|
" - head: [head1, head2]\n",
|
|
|
|
" - keySwitch: xKeySwitch object\n",
|
|
|
|
" - lx: float\n",
|
|
|
|
" - rx: float\n",
|
|
|
|
" - ry: float\n",
|
|
|
|
" - L2: float\n",
|
|
|
|
" - ly: float\n",
|
|
|
|
" \"\"\"\n",
|
|
|
|
" wirelessRemote = self.state.wireless_remote[:24]\n",
|
|
|
|
" binary_data = bytes(wirelessRemote)\n",
|
|
|
|
"\n",
|
|
|
|
" format_str = \"<2BH5f\"\n",
|
|
|
|
" data = struct.unpack(format_str, binary_data)\n",
|
|
|
|
"\n",
|
|
|
|
" head = list(data[:2])\n",
|
|
|
|
" lx = data[3]\n",
|
|
|
|
" rx = data[4]\n",
|
|
|
|
" ry = data[5]\n",
|
|
|
|
" L2 = data[6]\n",
|
|
|
|
" ly = data[7]\n",
|
|
|
|
"\n",
|
|
|
|
" _btn = bin(data[2])[2:].zfill(16)\n",
|
|
|
|
" btn = [int(char) for char in _btn]\n",
|
|
|
|
" btn.reverse()\n",
|
|
|
|
"\n",
|
|
|
|
" keySwitch = xKeySwitch(*btn)\n",
|
|
|
|
" rockerBtn = xRockerBtn(head, keySwitch, lx, rx, ry, L2, ly)\n",
|
|
|
|
" return rockerBtn\n",
|
|
|
|
"\n",
|
|
|
|
" def getCommandFromRemote(self):\n",
|
|
|
|
" \"\"\"Do not use directly for control!!!\"\"\"\n",
|
|
|
|
" rockerBtn = self.getRemoteState()\n",
|
|
|
|
"\n",
|
|
|
|
" lx = rockerBtn.lx\n",
|
|
|
|
" ly = rockerBtn.ly\n",
|
|
|
|
" rx = rockerBtn.rx\n",
|
|
|
|
"\n",
|
|
|
|
" v_x = ly * self.vx_max\n",
|
|
|
|
" v_y = lx * self.vy_max\n",
|
|
|
|
" ω = rx * self.ωz_max\n",
|
|
|
|
" \n",
|
|
|
|
" return v_x, v_y, ω\n",
|
|
|
|
"\n",
|
|
|
|
" def getBatteryState(self):\n",
|
|
|
|
" \"\"\"Returns the battery percentage of the robot\"\"\"\n",
|
|
|
|
" batteryState = self.state.bms_state\n",
|
|
|
|
" return batteryState.soc\n",
|
|
|
|
"\n",
|
|
|
|
" def setCommandsHigh(self, v_x, v_y, ω_z, bodyHeight=0.0, footRaiseHeight=0.0, mode=2):\n",
|
|
|
|
" self.cmd_watchdog_timer = time.time()\n",
|
|
|
|
" _v_x, _v_y, _ω_z = self.clip_velocity(v_x, v_y, ω_z)\n",
|
|
|
|
" self.highcmd.header.stamp = self.get_clock().now().to_msg()\n",
|
|
|
|
" self.highcmd.header.frame_id = \"base_link\"\n",
|
|
|
|
" self.highcmd.twist.linear.x = _v_x\n",
|
|
|
|
" self.highcmd.twist.linear.y = _v_y\n",
|
|
|
|
" self.highcmd.twist.angular.z = _ω_z\n",
|
|
|
|
" self.highcmd_publisher.publish(self.highcmd)\n",
|
|
|
|
"\n",
|
|
|
|
" def setCommandsLow(self, q, dq, kp, kd, tau_ff):\n",
|
|
|
|
" assert q.size == dq.size == kp.size == kd.size == tau_ff.size == 12, \"q, dq, kp, kd, tau_ff should have size 12\"\n",
|
|
|
|
" lowcmd = Go2pyLowCmd_(\n",
|
|
|
|
" q,\n",
|
|
|
|
" dq, \n",
|
|
|
|
" kp,\n",
|
|
|
|
" kd,\n",
|
|
|
|
" tau_ff\n",
|
|
|
|
" )\n",
|
|
|
|
" self.lowcmd_writer.write(lowcmd)\n",
|
|
|
|
"\n",
|
|
|
|
" def close(self):\n",
|
|
|
|
" self.running = False\n",
|
|
|
|
"\n",
|
|
|
|
" def check_calf_collision(self, q):\n",
|
|
|
|
" self.pin_robot.update(q)\n",
|
|
|
|
" in_collision = self.pin_robot.check_calf_collision(q)\n",
|
|
|
|
" return in_collision\n",
|
|
|
|
"\n",
|
|
|
|
" def clip_velocity(self, v_x, v_y, ω_z):\n",
|
|
|
|
" _v = np.array([[v_x], [v_y]])\n",
|
|
|
|
" _scale = np.sqrt(_v.T @ self.P_v_max @ _v)[0, 0]\n",
|
|
|
|
"\n",
|
|
|
|
" if _scale > 1.0:\n",
|
|
|
|
" scale = 1.0 / _scale\n",
|
|
|
|
" else:\n",
|
|
|
|
" scale = 1.0\n",
|
|
|
|
"\n",
|
|
|
|
" return scale * v_x, scale * v_y, np.clip(ω_z, self.ωz_min, self.ωz_max)"
|
|
|
|
]
|
|
|
|
},
|
|
|
|
{
|
|
|
|
"cell_type": "code",
|
2024-03-11 06:34:53 +08:00
|
|
|
"execution_count": null,
|
|
|
|
"metadata": {},
|
|
|
|
"outputs": [],
|
|
|
|
"source": []
|
|
|
|
},
|
|
|
|
{
|
|
|
|
"cell_type": "code",
|
|
|
|
"execution_count": null,
|
2024-03-11 06:27:42 +08:00
|
|
|
"metadata": {},
|
|
|
|
"outputs": [],
|
|
|
|
"source": [
|
2024-03-11 06:34:53 +08:00
|
|
|
"robot.getBatteryState()"
|
2024-03-11 06:27:42 +08:00
|
|
|
]
|
|
|
|
},
|
|
|
|
{
|
|
|
|
"cell_type": "code",
|
2024-03-11 06:34:53 +08:00
|
|
|
"execution_count": null,
|
2024-03-11 06:27:42 +08:00
|
|
|
"metadata": {},
|
2024-03-11 06:34:53 +08:00
|
|
|
"outputs": [],
|
2024-03-11 06:27:42 +08:00
|
|
|
"source": [
|
2024-03-11 06:34:53 +08:00
|
|
|
"from Go2Py.robot.interface.dds import GO2Real\n",
|
|
|
|
"robot = GO2Real()"
|
2024-03-11 06:27:42 +08:00
|
|
|
]
|
|
|
|
},
|
|
|
|
{
|
|
|
|
"cell_type": "code",
|
2024-03-11 06:34:53 +08:00
|
|
|
"execution_count": null,
|
2024-03-11 06:27:42 +08:00
|
|
|
"metadata": {},
|
2024-03-11 06:34:53 +08:00
|
|
|
"outputs": [],
|
2024-03-11 06:27:42 +08:00
|
|
|
"source": [
|
2024-03-11 06:34:53 +08:00
|
|
|
"robot.getIMU()"
|
2024-03-11 06:27:42 +08:00
|
|
|
]
|
2024-03-11 06:34:53 +08:00
|
|
|
},
|
|
|
|
{
|
|
|
|
"cell_type": "code",
|
|
|
|
"execution_count": null,
|
|
|
|
"metadata": {},
|
|
|
|
"outputs": [],
|
|
|
|
"source": []
|
2024-03-11 06:27:42 +08:00
|
|
|
}
|
|
|
|
],
|
|
|
|
"metadata": {
|
|
|
|
"kernelspec": {
|
|
|
|
"display_name": "Python 3",
|
|
|
|
"language": "python",
|
|
|
|
"name": "python3"
|
|
|
|
},
|
|
|
|
"language_info": {
|
|
|
|
"codemirror_mode": {
|
|
|
|
"name": "ipython",
|
|
|
|
"version": 3
|
|
|
|
},
|
|
|
|
"file_extension": ".py",
|
|
|
|
"mimetype": "text/x-python",
|
|
|
|
"name": "python",
|
|
|
|
"nbconvert_exporter": "python",
|
|
|
|
"pygments_lexer": "ipython3",
|
|
|
|
"version": "3.8.10"
|
|
|
|
}
|
|
|
|
},
|
|
|
|
"nbformat": 4,
|
|
|
|
"nbformat_minor": 2
|
|
|
|
}
|