{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from Go2Py.robot.interface.dds import GO2Real\n", "import time\n", "robot = GO2Real(mode='lowlevel')" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import threading\n", "from pynput import keyboard\n", "class BaseRemote:\n", " def __init__(self):\n", " pass\n", " def startSeq(self):\n", " return False\n", " def standUpDownSeq(self):\n", " return False\n", "\n", " def flushStates(self):\n", " pass\n", "\n", "remote = BaseRemote()\n", "\n", "class KeyboardRemote(BaseRemote):\n", " def __init__(self):\n", " super().__init__()\n", " self.start_seq_flag = False\n", " self.stand_up_down_seq_flag = False\n", " self.listener_thread = threading.Thread(target=self._listen_to_keyboard)\n", " self.listener_thread.daemon = True\n", " self.listener_thread.start()\n", "\n", " def _on_press(self, key):\n", " try:\n", " if key.char == 's': # Start sequence\n", " self.start_seq_flag = True\n", " elif key.char == 'u': # Stand up/down sequence\n", " self.stand_up_down_seq_flag = True\n", " except AttributeError:\n", " pass # Special keys (like space) will be handled here\n", "\n", " \n", " def _on_release(self, key):\n", " try:\n", " if key.char == 's': # Start sequence\n", " self.start_seq_flag = False\n", " elif key.char == 'u': # Stand up/down sequence\n", " self.stand_up_down_seq_flag = False\n", " except AttributeError:\n", " pass # Special keys (like space) will be handled here\n", "\n", " def _listen_to_keyboard(self):\n", " with keyboard.Listener(on_press=self._on_press, on_release=self._on_release) as listener:\n", " listener.join()\n", "\n", " def startSeq(self):\n", " if self.start_seq_flag:\n", " self.start_seq_flag = False\n", " return True\n", " return False\n", "\n", " def standUpDownSeq(self):\n", " if self.stand_up_down_seq_flag:\n", " self.stand_up_down_seq_flag = False\n", " return True\n", " return False\n", "\n", " def flushStates(self):\n", " self.stand_up_down_seq_flag = False\n", " self.start_seq_flag = False" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "remote = KeyboardRemote()" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "import time\n", "class SafetyHypervisor():\n", " def __init__(self, robot):\n", " self.robot = robot\n", "\n", " def unsafe(self):\n", " return False\n", " \n", " def controlTimeout(self):\n", " if time.time() - robot.latest_command_stamp > 0.1:\n", " print('controller timeout')\n", " return True\n", " else:\n", " return False \n", "\n", "safety = SafetyHypervisor(robot)" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "import threading\n", "import numpy as np\n", "import time\n", "from enum import Enum\n", "\n", "\n", "class FSM:\n", " def __init__(self, robot, remote, safety_hypervisor, user_controller_callback=None):\n", " self.robot = robot\n", " self.remote = remote\n", " self.remote.flushStates()\n", " self.safety = safety_hypervisor\n", " self.user_controller_callback = user_controller_callback\n", "\n", " self.state = \"damping\"\n", "\n", " # self.tracking_kp = np.array(4*[200, 200, 350.]).reshape(12)\n", " # self.tracking_kv = np.array(12*[10.])\n", "\n", " self.tracking_kp = np.array(4*[150, 150, 150.]).reshape(12)\n", " self.tracking_kv = np.array(12*[3.])\n", " self.damping_kv = np.array(12*[2.])\n", "\n", "\n", " self.tracking_complete = True\n", " self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12))\n", " self.fsm_dT = 1./50.\n", " self.control_dT = 1./400.\n", " self.dT = self.fsm_dT\n", "\n", " self.modes = {\"tracking\":self.trackingControlUpdate,\n", " \"damping\" :self.dampingControlUpdate,\n", " \"user\": self.userControlUpdate,\n", " }\n", " self.setMode(\"damping\")\n", "\n", " self.running = True\n", " self.fsm_thread = threading.Thread(target = self.update)\n", " self.fsm_thread.start()\n", " # if the robot is a simulation, create a thread for stepping it\n", " if robot.simulated:\n", " self.sim_thread = threading.Thread(target=self.simUpdate)\n", " self.sim_thread.start()\n", "\n", " def setMode(self, mode):\n", " assert mode in self.modes.keys(), 'the requested control update mode is not implemented'\n", " self.updateCommands = self.modes[mode]\n", " # print(f'setting mode to {mode}')\n", " \n", " def moveTo(self, target, duration=0.5):\n", " # assert self.tracking_complete, 'The previous moveTo command is not completed yet!'\n", " self.q_start = self.robot.getJointStates()['q']\n", " self.q_target = target\n", " self.time = 0.0\n", " self.move_duration = duration\n", " self.q_des = lambda t: [self.q_start + np.clip((t)/self.move_duration,0, 1)*(self.q_target - self.q_start), \\\n", " True if np.clip((t)/self.move_duration,0, 1)==1 else False] # q_des(t), Movement finished\n", " self.tracking_complete = False\n", " self.setMode(\"tracking\")\n", "\n", " def trackingControlUpdate(self):\n", " self.time +=self.dT\n", " q_des, done = self.q_des(self.time)\n", " self.robot.setCommands(q_des, np.zeros(12), self.tracking_kp, self.tracking_kv, np.zeros(12))\n", " self.tracking_complete = done\n", "\n", " def dampingControlUpdate(self):\n", " self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12))\n", "\n", " def userControlUpdate(self):\n", " if self.user_controller_callback is not None:\n", " user_controller_callback(robot, remote)\n", "\n", " def simUpdate(self):\n", " while self.running:\n", " self.robot.step()\n", " time.sleep(robot.dt)\n", " \n", " def update(self):\n", " while self.running:\n", " getattr(self, self.state)()\n", " time.sleep(self.dT)\n", " self.updateCommands()\n", " \n", " def close(self):\n", " self.running = False\n", "\n", " # The following functions each are the states of the FSM\n", " def damping(self):\n", " # print('damping')\n", " if self.remote.standUpDownSeq():\n", " self.moveTo(robot.prestanding_q, 1)\n", " self.state = \"pre_standing\"\n", " else:\n", " self.setMode(\"damping\")\n", " self.state = \"damping\"\n", "\n", " def pre_standing(self):\n", " # print(\"pre_stance\")\n", " if self.tracking_complete:\n", " self.moveTo(robot.standing_q, duration=1.5)\n", " self.state = 'standing'\n", " else:\n", " self.state = \"pre_standing\"\n", " \n", " def standing(self):\n", " # print(\"standing\")\n", " if self.tracking_complete:\n", " # self.moveTo(robot.standing_q, duration=1)\n", " self.state = 'locked_stance'\n", " else:\n", " self.state = \"standing\"\n", "\n", " def locked_stance(self):\n", " # print(\"locked_stance\")\n", " if self.remote.startSeq():\n", " self.setMode(\"user\")\n", " self.dT = self.control_dT\n", " self.state = \"user_loop\"\n", " self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12))\n", " elif self.remote.standUpDownSeq() or robot.overheat():\n", " self.moveTo(robot.sitting_q, duration = 1.5)\n", " self.state = \"sitting\"\n", "\n", " def user_loop(self):\n", " # print(\"user_loop\")\n", " if self.safety.unsafe():\n", " self.dT = self.fsm_dT\n", " self.setMode(\"damping\")\n", " elif self.remote.standUpDownSeq() or self.safety.controlTimeout():\n", " self.dT = self.fsm_dT\n", " self.moveTo(robot.standing_q, duration = 1)\n", " self.timer = time.time()\n", " self.state = \"switch_back_to_locked_stance\"\n", " else:\n", " self.state = \"user_loop\"\n", "\n", " def sitting(self):\n", " # print('sitting')\n", " if self.tracking_complete:\n", " self.setMode(\"damping\")\n", " self.state = 'damping'\n", " else:\n", " self.state = \"sitting\"\n", "\n", " def switch_back_to_locked_stance(self):\n", " if time.time()-self.timer > 0.5:\n", " # print(\"going back to locked stance\")\n", " self.state = \"locked_stance\"" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "fsm = FSM(robot, remote, safety)" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "fsm.close()" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{'q': array([-0.02495611, 1.26249647, -2.82826662, 0.04563564, 1.2505368 ,\n", " -2.7933557 , -0.30623949, 1.28283751, -2.82314873, 0.26400229,\n", " 1.29355574, -2.84276843]),\n", " 'dq': array([ 0.03875524, 0.0155021 , 0.01011006, 0.00775105, 0.03875524,\n", " 0.01415409, -0.04263076, -0.00387552, 0.02426415, -0.00387552,\n", " 0. , -0.02426415]),\n", " 'ddq': array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.]),\n", " 'tau_est': array([-0.02473828, 0.04947656, -0.04741504, -0.07421485, 0. ,\n", " 0. , 0.04947656, 0.04947656, 0. , 0.02473828,\n", " -0.07421485, 0. ]),\n", " 'temperature': array([38., 32., 32., 36., 32., 33., 41., 33., 33., 41., 33., 33.])}" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "robot.getJointStates()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "b1-env", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.18" } }, "nbformat": 4, "nbformat_minor": 2 }