{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from cyclonedds.domain import DomainParticipant\n", "from Go2Py.unitree_go.msg.dds_ import Go2pyLowCmd_\n", "from cyclonedds.topic import Topic\n", "from cyclonedds.pub import DataWriter" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "participant = DomainParticipant()\n", "topic = Topic(participant, \"rt/go2/lowcmd\", Go2pyLowCmd_)\n", "writer = DataWriter(participant, topic)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "msg = Go2pyLowCmd_(np.zeros(12),\n", "np.zeros(12),np.zeros(12),np.zeros(12),np.zeros(12))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "while True:\n", " writer.write(msg)\n", " time.sleep(0.001)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from Go2Py.std_msgs.msg.dds_ import String_\n", "participant = DomainParticipant()\n", "topic = Topic(participant, \"rt/test\", String_)\n", "writer = DataWriter(participant, topic)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "msg = String_(\"Hello\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "while True:\n", " writer.write(msg)\n", " time.sleep(0.001)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from cyclonedds.domain import DomainParticipant\n", "from cyclonedds.topic import Topic\n", "from cyclonedds.sub import DataReader\n", "from cyclonedds.util import duration\n", "from Go2Py.unitree_go.msg.dds_ import LowState_" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "participant = DomainParticipant()\n", "topic = Topic(participant, 'rt/lowstate', LowState_)\n", "reader = DataReader(participant, topic)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for msg in reader.take_iter(timeout=duration(milliseconds=100.)):\n", " print(msg)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "reader.take(timeout=duration(milliseconds=100.))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import struct\n", "import threading\n", "import time\n", "import numpy as np\n", "import numpy.linalg as LA\n", "from scipy.spatial.transform import Rotation as R\n", "\n", "from cyclonedds.domain import DomainParticipant\n", "from Go2Py.unitree_go.msg.dds_ import Go2pyLowCmd_\n", "from cyclonedds.topic import Topic\n", "from cyclonedds.pub import DataWriter\n", "\n", "from cyclonedds.domain import DomainParticipant\n", "from cyclonedds.topic import Topic\n", "from cyclonedds.sub import DataReader\n", "from cyclonedds.util import duration\n", "from Go2Py.unitree_go.msg.dds_ import LowState_\n", "from threading import Thread" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class GO2Real():\n", " def __init__(\n", " self,\n", " mode = 'lowlevel', # 'highlevel' or 'lowlevel'\n", " vx_max=0.5,\n", " vy_max=0.4,\n", " ωz_max=0.5,\n", " ):\n", " assert mode in ['highlevel', 'lowlevel'], \"mode should be either 'highlevel' or 'lowlevel'\"\n", " self.mode = mode\n", " if self.mode == 'highlevel':\n", " raise NotImplementedError('DDS interface for the highlevel commands is not implemented yet. Please use our ROS2 interface.')\n", "\n", " self.highcmd_topic_name = \"rt/go2/twist_cmd\"\n", " self.lowcmd_topic_name = \"rt/go2/lowcmd\"\n", " self.lowstate_topic_name = \"rt/lowstate\"\n", "\n", " self.participant = DomainParticipant()\n", " self.lowstate_topic = Topic(self.participant, self.lowstate_topic_name, LowState_)\n", " self.lowstate_reader = DataReader(self.participant, self.lowstate_topic)\n", " \n", " self.lowcmd_topic = Topic(self.participant, self.lowcmd_topic_name, Go2pyLowCmd_)\n", " self.lowcmd_writer = DataWriter(self.participant, self.lowcmd_topic)\n", "\n", " self.state = None\n", " self.setCommands = {'lowlevel':self.setCommandsLow}[self.mode]\n", " self.lowstate_thread = Thread(target = self.lowstate_update)\n", " self.running = True\n", " self.lowstate_thread.start()\n", "\n", " def lowstate_update(self):\n", " \"\"\"\n", " Retrieve the state of the robot\n", " \"\"\"\n", " while self.running:\n", " for msg in self.lowstate_reader.take_iter(timeout=duration(milliseconds=100.)):\n", " self.state = msg\n", "\n", " def getIMU(self):\n", " accel = self.state.imu_state.accelerometer\n", " gyro = self.state.imu_state.gyroscope\n", " quat = self.state.imu_state.quaternion\n", " rpy = self.state.imu_state.rpy\n", " temp = self.state.imu_state.temperature\n", " return {'accel':accel, 'gyro':gyro, 'quat':quat, \"rpy\":rpy, 'temp':temp}\n", "\n", " def getFootContacts(self):\n", " \"\"\"Returns the raw foot contact forces\"\"\"\n", " footContacts = self.state.foot_force \n", " return np.array(footContacts)\n", "\n", " def getJointStates(self):\n", " \"\"\"Returns the joint angles (q) and velocities (dq) of the robot\"\"\"\n", " motor_state = np.array([[self.state.motor_state[i].q,\n", " robot.state.motor_state[i].dq,\n", " robot.state.motor_state[i].ddq,\n", " robot.state.motor_state[i].tau_est,\n", " robot.state.motor_state[i].temperature] for i in range(12)])\n", " return {'q':motor_state[:,0], \n", " 'dq':motor_state[:,1],\n", " 'ddq':motor_state[:,2],\n", " 'tau_est':motor_state[:,3],\n", " 'temperature':motor_state[:,4]}\n", "\n", " def getRemoteState(self):\n", " \"\"\"A method to get the state of the wireless remote control. \n", " Returns a xRockerBtn object: \n", " - head: [head1, head2]\n", " - keySwitch: xKeySwitch object\n", " - lx: float\n", " - rx: float\n", " - ry: float\n", " - L2: float\n", " - ly: float\n", " \"\"\"\n", " wirelessRemote = self.state.wireless_remote[:24]\n", " binary_data = bytes(wirelessRemote)\n", "\n", " format_str = \"<2BH5f\"\n", " data = struct.unpack(format_str, binary_data)\n", "\n", " head = list(data[:2])\n", " lx = data[3]\n", " rx = data[4]\n", " ry = data[5]\n", " L2 = data[6]\n", " ly = data[7]\n", "\n", " _btn = bin(data[2])[2:].zfill(16)\n", " btn = [int(char) for char in _btn]\n", " btn.reverse()\n", "\n", " keySwitch = xKeySwitch(*btn)\n", " rockerBtn = xRockerBtn(head, keySwitch, lx, rx, ry, L2, ly)\n", " return rockerBtn\n", "\n", " def getCommandFromRemote(self):\n", " \"\"\"Do not use directly for control!!!\"\"\"\n", " rockerBtn = self.getRemoteState()\n", "\n", " lx = rockerBtn.lx\n", " ly = rockerBtn.ly\n", " rx = rockerBtn.rx\n", "\n", " v_x = ly * self.vx_max\n", " v_y = lx * self.vy_max\n", " ω = rx * self.ωz_max\n", " \n", " return v_x, v_y, ω\n", "\n", " def getBatteryState(self):\n", " \"\"\"Returns the battery percentage of the robot\"\"\"\n", " batteryState = self.state.bms_state\n", " return batteryState.soc\n", "\n", " def setCommandsHigh(self, v_x, v_y, ω_z, bodyHeight=0.0, footRaiseHeight=0.0, mode=2):\n", " self.cmd_watchdog_timer = time.time()\n", " _v_x, _v_y, _ω_z = self.clip_velocity(v_x, v_y, ω_z)\n", " self.highcmd.header.stamp = self.get_clock().now().to_msg()\n", " self.highcmd.header.frame_id = \"base_link\"\n", " self.highcmd.twist.linear.x = _v_x\n", " self.highcmd.twist.linear.y = _v_y\n", " self.highcmd.twist.angular.z = _ω_z\n", " self.highcmd_publisher.publish(self.highcmd)\n", "\n", " def setCommandsLow(self, q, dq, kp, kd, tau_ff):\n", " assert q.size == dq.size == kp.size == kd.size == tau_ff.size == 12, \"q, dq, kp, kd, tau_ff should have size 12\"\n", " lowcmd = Go2pyLowCmd_(\n", " q,\n", " dq, \n", " kp,\n", " kd,\n", " tau_ff\n", " )\n", " self.lowcmd_writer.write(lowcmd)\n", "\n", " def close(self):\n", " self.running = False\n", "\n", " def check_calf_collision(self, q):\n", " self.pin_robot.update(q)\n", " in_collision = self.pin_robot.check_calf_collision(q)\n", " return in_collision\n", "\n", " def clip_velocity(self, v_x, v_y, ω_z):\n", " _v = np.array([[v_x], [v_y]])\n", " _scale = np.sqrt(_v.T @ self.P_v_max @ _v)[0, 0]\n", "\n", " if _scale > 1.0:\n", " scale = 1.0 / _scale\n", " else:\n", " scale = 1.0\n", "\n", " return scale * v_x, scale * v_y, np.clip(ω_z, self.ωz_min, self.ωz_max)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "robot.getBatteryState()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from Go2Py.robot.interface.dds import GO2Real\n", "robot = GO2Real()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "robot.getIMU()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" } }, "nbformat": 4, "nbformat_minor": 2 }