Go2Py_SIM/examples/fsm_sim.ipynb

382 lines
12 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from Go2Py.sim.mujoco import Go2Sim\n",
"import numpy as np"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"robot = Go2Sim()\n",
"robot.sitDownReset()"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"import threading\n",
"from pynput import keyboard\n",
"class BaseRemote:\n",
" def __init__(self):\n",
" pass\n",
" def startSeq(self):\n",
" return False\n",
" def standUpDownSeq(self):\n",
" return False\n",
"\n",
" def flushStates(self):\n",
" pass\n",
"\n",
"remote = BaseRemote()\n",
"\n",
"class KeyboardRemote(BaseRemote):\n",
" def __init__(self):\n",
" super().__init__()\n",
" self.start_seq_flag = False\n",
" self.stand_up_down_seq_flag = False\n",
" self.listener_thread = threading.Thread(target=self._listen_to_keyboard)\n",
" self.listener_thread.daemon = True\n",
" self.listener_thread.start()\n",
"\n",
" def _on_press(self, key):\n",
" try:\n",
" if key.char == 's': # Start sequence\n",
" self.start_seq_flag = True\n",
" elif key.char == 'u': # Stand up/down sequence\n",
" self.stand_up_down_seq_flag = True\n",
" except AttributeError:\n",
" pass # Special keys (like space) will be handled here\n",
"\n",
" \n",
" def _on_release(self, key):\n",
" try:\n",
" if key.char == 's': # Start sequence\n",
" self.start_seq_flag = False\n",
" elif key.char == 'u': # Stand up/down sequence\n",
" self.stand_up_down_seq_flag = False\n",
" except AttributeError:\n",
" pass # Special keys (like space) will be handled here\n",
"\n",
" def _listen_to_keyboard(self):\n",
" with keyboard.Listener(on_press=self._on_press, on_release=self._on_release) as listener:\n",
" listener.join()\n",
"\n",
" def startSeq(self):\n",
" if self.start_seq_flag:\n",
" self.start_seq_flag = False\n",
" return True\n",
" return False\n",
"\n",
" def standUpDownSeq(self):\n",
" if self.stand_up_down_seq_flag:\n",
" self.stand_up_down_seq_flag = False\n",
" return True\n",
" return False\n",
"\n",
" def flushStates(self):\n",
" self.stand_up_down_seq_flag = False\n",
" self.start_seq_flag = False"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"remote = KeyboardRemote()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"class SafetyHypervisor():\n",
" def __init__(self, robot):\n",
" self.robot = robot\n",
"\n",
" def unsafe(self):\n",
" return False\n",
" \n",
" def controlTimeout(self):\n",
" if time.time() - robot.latest_command_stamp > 0.1:\n",
" print('controller timeout')\n",
" return True\n",
" else:\n",
" return False \n",
"\n",
"safety = SafetyHypervisor(robot)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"import threading\n",
"import numpy as np\n",
"import time\n",
"from enum import Enum\n",
"\n",
"\n",
"class FSM:\n",
" def __init__(self, robot, remote, safety_hypervisor, user_controller_callback=None):\n",
" self.robot = robot\n",
" self.remote = remote\n",
" self.remote.flushStates()\n",
" self.safety = safety_hypervisor\n",
" self.user_controller_callback = user_controller_callback\n",
"\n",
" self.state = \"damping\"\n",
"\n",
" # self.tracking_kp = np.array(4*[200, 200, 350.]).reshape(12)\n",
" # self.tracking_kv = np.array(12*[10.])\n",
"\n",
" self.tracking_kp = np.array(4*[150, 150, 150.]).reshape(12)\n",
" self.tracking_kv = np.array(12*[3.])\n",
" self.damping_kv = np.array(12*[2.])\n",
"\n",
"\n",
" self.tracking_complete = True\n",
" self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12))\n",
" self.fsm_dT = 1./50.\n",
" self.control_dT = 1./400.\n",
" self.dT = self.fsm_dT\n",
"\n",
" self.modes = {\"tracking\":self.trackingControlUpdate,\n",
" \"damping\" :self.dampingControlUpdate,\n",
" \"user\": self.userControlUpdate,\n",
" }\n",
" self.setMode(\"damping\")\n",
"\n",
" self.running = True\n",
" self.fsm_thread = threading.Thread(target = self.update)\n",
" self.fsm_thread.start()\n",
" # if the robot is a simulation, create a thread for stepping it\n",
" if robot.simulated:\n",
" self.sim_thread = threading.Thread(target=self.simUpdate)\n",
" self.sim_thread.start()\n",
"\n",
" def setMode(self, mode):\n",
" assert mode in self.modes.keys(), 'the requested control update mode is not implemented'\n",
" self.updateCommands = self.modes[mode]\n",
" # print(f'setting mode to {mode}')\n",
" \n",
" def moveTo(self, target, duration=0.5):\n",
" # assert self.tracking_complete, 'The previous moveTo command is not completed yet!'\n",
" self.q_start = self.robot.getJointStates()['q']\n",
" self.q_target = target\n",
" self.time = 0.0\n",
" self.move_duration = duration\n",
" self.q_des = lambda t: [self.q_start + np.clip((t)/self.move_duration,0, 1)*(self.q_target - self.q_start), \\\n",
" True if np.clip((t)/self.move_duration,0, 1)==1 else False] # q_des(t), Movement finished\n",
" self.tracking_complete = False\n",
" self.setMode(\"tracking\")\n",
"\n",
" def trackingControlUpdate(self):\n",
" self.time +=self.dT\n",
" q_des, done = self.q_des(self.time)\n",
" self.robot.setCommands(q_des, np.zeros(12), self.tracking_kp, self.tracking_kv, np.zeros(12))\n",
" self.tracking_complete = done\n",
"\n",
" def dampingControlUpdate(self):\n",
" self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), self.damping_kv, np.zeros(12))\n",
"\n",
" def userControlUpdate(self):\n",
" if self.user_controller_callback is not None:\n",
" user_controller_callback(robot, remote)\n",
"\n",
" def simUpdate(self):\n",
" while self.running:\n",
" self.robot.step()\n",
" time.sleep(robot.dt)\n",
" \n",
" def update(self):\n",
" while self.running:\n",
" getattr(self, self.state)()\n",
" time.sleep(self.dT)\n",
" self.updateCommands()\n",
" \n",
" def close(self):\n",
" self.running = False\n",
"\n",
" # The following functions each are the states of the FSM\n",
" def damping(self):\n",
" # print('damping')\n",
" if self.remote.standUpDownSeq():\n",
" self.moveTo(robot.prestanding_q, 1)\n",
" self.state = \"pre_standing\"\n",
" else:\n",
" self.setMode(\"damping\")\n",
" self.state = \"damping\"\n",
"\n",
" def pre_standing(self):\n",
" # print(\"pre_stance\")\n",
" if self.tracking_complete:\n",
" self.moveTo(robot.standing_q, duration=1.5)\n",
" self.state = 'standing'\n",
" else:\n",
" self.state = \"pre_standing\"\n",
" \n",
" def standing(self):\n",
" # print(\"standing\")\n",
" if self.tracking_complete:\n",
" # self.moveTo(robot.standing_q, duration=1)\n",
" self.state = 'locked_stance'\n",
" else:\n",
" self.state = \"standing\"\n",
"\n",
" def locked_stance(self):\n",
" # print(\"locked_stance\")\n",
" if self.remote.startSeq():\n",
" self.setMode(\"user\")\n",
" self.dT = self.control_dT\n",
" self.state = \"user_loop\"\n",
" self.robot.setCommands(np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12), np.zeros(12))\n",
" elif self.remote.standUpDownSeq() or robot.overheat():\n",
" self.moveTo(robot.sitting_q, duration = 1.5)\n",
" self.state = \"sitting\"\n",
"\n",
" def user_loop(self):\n",
" # print(\"user_loop\")\n",
" if self.safety.unsafe():\n",
" self.dT = self.fsm_dT\n",
" self.setMode(\"damping\")\n",
" elif self.remote.standUpDownSeq() or self.safety.controlTimeout():\n",
" self.dT = self.fsm_dT\n",
" self.moveTo(robot.standing_q, duration = 1)\n",
" self.timer = time.time()\n",
" self.state = \"switch_back_to_locked_stance\"\n",
" else:\n",
" self.state = \"user_loop\"\n",
"\n",
" def sitting(self):\n",
" # print('sitting')\n",
" if self.tracking_complete:\n",
" self.setMode(\"damping\")\n",
" self.state = 'damping'\n",
" else:\n",
" self.state = \"sitting\"\n",
"\n",
" def switch_back_to_locked_stance(self):\n",
" if time.time()-self.timer > 0.5:\n",
" # print(\"going back to locked stance\")\n",
" self.state = \"locked_stance\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Exception in thread Thread-9:\n",
"Traceback (most recent call last):\n",
" File \"/home/rstaion/miniconda3/envs/b1py/lib/python3.8/threading.py\", line 932, in _bootstrap_inner\n",
" self.run()\n",
" File \"/home/rstaion/miniconda3/envs/b1py/lib/python3.8/threading.py\", line 870, in run\n",
" self._target(*self._args, **self._kwargs)\n",
" File \"/tmp/ipykernel_130541/2701327530.py\", line 76, in simUpdate\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" File \"/home/rstaion/projects/rooholla/locomotion/Go2Py/Go2Py/sim/mujoco.py\", line 122, in step\n",
" tau = np.diag(self.kp)@(self.q_des-q).reshape(12,1)+ \\\n",
"numpy.core._exceptions.UFuncTypeError: ufunc 'subtract' did not contain a loop with signature matching types (dtype('float64'), dtype('<U1')) -> None\n"
]
}
],
"source": [
"fsm = FSM(robot, remote, safety)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fsm.moveTo(robot.sitting_q, 1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fsm.moveTo(robot.standing_q, 2)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fsm.setMode(\"damping\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fsm.close()\n",
"robot.sitDownReset()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "b1-env",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.18"
}
},
"nbformat": 4,
"nbformat_minor": 2
}