ros2 teleoperate

This commit is contained in:
Yoi Hibino 2025-04-11 22:25:03 -07:00
parent b568de35ad
commit 504748c430
4 changed files with 481 additions and 0 deletions

View File

@ -0,0 +1,148 @@
# ROS2 Distributed Teleoperation
This tutorial explains how to set up a distributed teleoperation system using ROS2, allowing a leader robot on one computer to control a follower robot on another computer over a network.
## Prerequisites
1. ROS2 (Foxy or newer) installed on both computers
2. Network connectivity between the leader and follower computers
3. lerobot package installed on both computers
## Installation
Ensure you have the required ROS2 packages for sensor messages:
```bash
pip install sensor_msgs
```
## Setup
### 1. Configure ROS2 Domain ID (optional but recommended)
To isolate your ROS2 network from others, set the same domain ID on both computers:
```bash
# On both leader and follower computers
export ROS_DOMAIN_ID=42 # Choose any number between 0-232
```
### 2. Network Configuration
Ensure both computers can communicate over the network:
1. Connect both computers to the same network
2. Verify they can ping each other
3. Make sure any firewalls allow ROS2 communication (default ports)
## Running the Distributed Teleoperation
### 1. Leader Robot Computer
On the computer connected to the leader robot:
```bash
python lerobot/scripts/run_ros2_leader.py \
--robot.type=koch \
--robot.leader_arms.main.port=/dev/ttyUSB0 \
--ros2_config.node_name=lerobot_teleop \
--ros2_config.topic_name=joint_states \
--ros2_config.publish_rate_hz=200.0
```
Replace `/dev/ttyUSB0` with the actual port of your leader robot.
### 2. Follower Robot Computer
On the computer connected to the follower robot:
```bash
python lerobot/scripts/run_ros2_follower.py \
--robot.type=koch \
--robot.follower_arms.main.port=/dev/ttyUSB0 \
--ros2_config.node_name=lerobot_teleop \
--ros2_config.topic_name=joint_states
```
Replace `/dev/ttyUSB0` with the actual port of your follower robot.
### Note on Topic Names
Make sure both leader and follower use the same topic name (default is `joint_states`).
## Troubleshooting
### 1. Check ROS2 Discovery
Verify nodes can discover each other:
```bash
# On either computer
ros2 topic list
```
You should see `/joint_states` in the list.
### 2. Monitor Message Flow
Check if messages are being published:
```bash
ros2 topic echo /joint_states
```
### 3. Network Issues
If nodes can't discover each other:
- Ensure both computers are on the same network
- Check if ROS_DOMAIN_ID is set to the same value on both computers
- Verify firewall settings
- Try using the ROS_LOCALHOST_ONLY=0 environment variable
### 4. Debug Node Status
```bash
ros2 node list
ros2 node info /lerobot_teleop_leader # or follower
```
## Advanced Configuration
### Quality of Service Settings
For teleoperation over unreliable networks, you can adjust QoS settings:
```bash
# Use reliable QoS instead of best effort (might increase latency but improve reliability)
--ros2_config.use_best_effort_qos=false
```
### Publishing Rate
Adjust the rate at which joint states are published:
```bash
# On leader robot, set a lower rate for unreliable networks
--ros2_config.publish_rate_hz=100.0
```
## Examples
### Using with myCobot 280
For MyCobot robots, ensure you've set up the correct motor types:
Leader:
```bash
python lerobot/scripts/run_ros2_leader.py \
--robot.type=mycobot \
--robot.leader_arms.main.port=/dev/ttyUSB0 \
--ros2_config.publish_rate_hz=100.0
```
Follower:
```bash
python lerobot/scripts/run_ros2_follower.py \
--robot.type=mycobot \
--robot.follower_arms.main.port=/dev/ttyUSB0
```

View File

@ -0,0 +1,203 @@
#!/usr/bin/env python3
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""ROS2 nodes for robot teleoperation between leader and follower robots."""
import time
from dataclasses import dataclass
from typing import Dict, List, Optional
import numpy as np
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSReliabilityPolicy
from sensor_msgs.msg import JointState
import torch
from lerobot.common.robot_devices.control_utils import busy_wait
from lerobot.common.robot_devices.robots.manipulator import ManipulatorRobot
from lerobot.common.robot_devices.robots.errors import RobotDeviceNotConnectedError
@dataclass
class ROS2TeleopConfig:
"""Configuration for ROS2 teleoperation."""
node_name: str = "lerobot_teleop"
topic_name: str = "joint_states"
publish_rate_hz: float = 200.0 # Default publish rate
use_best_effort_qos: bool = True # Use best effort QoS for real-time performance
class LeaderNode(Node):
"""ROS2 node for the leader robot that publishes joint states."""
def __init__(self, robot: ManipulatorRobot, config: ROS2TeleopConfig):
super().__init__(config.node_name + "_leader")
self.robot = robot
self.config = config
# Set up QoS profile - for real-time control, best effort is often better than reliable
qos = QoSProfile(depth=10)
if config.use_best_effort_qos:
qos.reliability = QoSReliabilityPolicy.BEST_EFFORT
# Create publisher
self.publisher = self.create_publisher(
JointState,
config.topic_name,
qos
)
# Create timer for publishing at specified rate
period = 1.0 / config.publish_rate_hz
self.timer = self.create_timer(period, self.publish_joint_states)
self.get_logger().info(f"Leader node initialized, publishing to {config.topic_name}")
def publish_joint_states(self):
"""Read joint states from leader robot and publish them."""
if not self.robot.is_connected:
self.get_logger().error("Robot is not connected")
return
# Read positions from all leader arms
leader_pos = {}
joint_names = []
positions = []
try:
for name in self.robot.leader_arms:
pos = self.robot.leader_arms[name].read("Present_Position")
leader_pos[name] = pos
# Extend the joint names and positions lists
motor_names = self.robot.leader_arms[name].motor_names
joint_names.extend([f"{name}_{motor_name}" for motor_name in motor_names])
positions.extend(pos.tolist())
# Create and publish the message
msg = JointState()
msg.header.stamp = self.get_clock().now().to_msg()
msg.name = joint_names
msg.position = positions
self.publisher.publish(msg)
except Exception as e:
self.get_logger().error(f"Failed to publish joint states: {e}")
class FollowerNode(Node):
"""ROS2 node for the follower robot that subscribes to joint states."""
def __init__(self, robot: ManipulatorRobot, config: ROS2TeleopConfig):
super().__init__(config.node_name + "_follower")
self.robot = robot
self.config = config
self.joint_positions = {} # Store the latest joint positions
# Set up QoS profile
qos = QoSProfile(depth=10)
if config.use_best_effort_qos:
qos.reliability = QoSReliabilityPolicy.BEST_EFFORT
# Create subscription
self.subscription = self.create_subscription(
JointState,
config.topic_name,
self.joint_states_callback,
qos
)
self.get_logger().info(f"Follower node initialized, subscribing to {config.topic_name}")
def joint_states_callback(self, msg: JointState):
"""Process incoming joint states and command the follower robot."""
if not self.robot.is_connected:
self.get_logger().error("Robot is not connected")
return
try:
# Parse the message and organize by arm name
arm_positions = {}
for i, joint_name in enumerate(msg.name):
# Joint names are expected to be in format: "arm_name_motor_name"
parts = joint_name.split("_", 1)
if len(parts) < 2:
continue
arm_name, _ = parts
if arm_name not in arm_positions:
arm_positions[arm_name] = []
arm_positions[arm_name].append(msg.position[i])
# Send positions to follower arms that match leader arms
for name, positions in arm_positions.items():
if name in self.robot.follower_arms:
# Convert to numpy array
pos_array = np.array(positions, dtype=np.float32)
# Store the latest positions
self.joint_positions[name] = pos_array
# Send to the follower robot
self.robot.follower_arms[name].write("Goal_Position", pos_array)
except Exception as e:
self.get_logger().error(f"Failed to process joint states: {e}")
def run_ros2_leader(robot: ManipulatorRobot, config: Optional[ROS2TeleopConfig] = None):
"""Run the ROS2 leader node until interrupted."""
if not robot.is_connected:
robot.connect()
if config is None:
config = ROS2TeleopConfig()
rclpy.init()
node = LeaderNode(robot, config)
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
def run_ros2_follower(robot: ManipulatorRobot, config: Optional[ROS2TeleopConfig] = None):
"""Run the ROS2 follower node until interrupted."""
if not robot.is_connected:
robot.connect()
if config is None:
config = ROS2TeleopConfig()
rclpy.init()
node = FollowerNode(robot, config)
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

View File

@ -0,0 +1,64 @@
#!/usr/bin/env python3
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import logging
from pprint import pformat
from dataclasses import asdict, dataclass
from lerobot.common.config import parse_and_overload, parser
from lerobot.common.robot_devices.control_configs import ControlPipelineConfig
from lerobot.common.robot_devices.robots.utils import make_robot_from_config
from lerobot.common.robot_devices.ros2_teleop import ROS2TeleopConfig, run_ros2_follower
from lerobot.common.logging_utils import init_logging
@dataclass
class ROS2FollowerConfig:
"""Configuration for ROS2 follower teleoperation."""
node_name: str = "lerobot_teleop"
topic_name: str = "joint_states"
use_best_effort_qos: bool = True
@parser.wrap()
def run_follower(cfg: ControlPipelineConfig, ros2_config: ROS2FollowerConfig = None):
"""Run a robot as a ROS2 follower that subscribes to joint states."""
init_logging()
if ros2_config is None:
ros2_config = ROS2FollowerConfig()
teleop_config = ROS2TeleopConfig(
node_name=ros2_config.node_name,
topic_name=ros2_config.topic_name,
use_best_effort_qos=ros2_config.use_best_effort_qos
)
logging.info("ROS2 Follower Configuration:")
logging.info(pformat(asdict(teleop_config)))
logging.info("Robot Configuration:")
logging.info(pformat(asdict(cfg.robot)))
robot = make_robot_from_config(cfg.robot)
try:
run_ros2_follower(robot, teleop_config)
except KeyboardInterrupt:
logging.info("Follower node stopped by user")
finally:
if robot.is_connected:
robot.disconnect()
if __name__ == "__main__":
run_follower()

View File

@ -0,0 +1,66 @@
#!/usr/bin/env python3
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import logging
from pprint import pformat
from dataclasses import asdict
from lerobot.common.config import parse_and_overload, parser
from lerobot.common.robot_devices.control_configs import ControlPipelineConfig
from lerobot.common.robot_devices.robots.utils import make_robot_from_config
from lerobot.common.robot_devices.ros2_teleop import ROS2TeleopConfig, run_ros2_leader
from lerobot.common.logging_utils import init_logging
@dataclass
class ROS2LeaderConfig:
"""Configuration for ROS2 leader teleoperation."""
node_name: str = "lerobot_teleop"
topic_name: str = "joint_states"
publish_rate_hz: float = 200.0
use_best_effort_qos: bool = True
@parser.wrap()
def run_leader(cfg: ControlPipelineConfig, ros2_config: ROS2LeaderConfig = None):
"""Run a robot as a ROS2 leader that publishes joint states."""
init_logging()
if ros2_config is None:
ros2_config = ROS2LeaderConfig()
teleop_config = ROS2TeleopConfig(
node_name=ros2_config.node_name,
topic_name=ros2_config.topic_name,
publish_rate_hz=ros2_config.publish_rate_hz,
use_best_effort_qos=ros2_config.use_best_effort_qos
)
logging.info("ROS2 Leader Configuration:")
logging.info(pformat(asdict(teleop_config)))
logging.info("Robot Configuration:")
logging.info(pformat(asdict(cfg.robot)))
robot = make_robot_from_config(cfg.robot)
try:
run_ros2_leader(robot, teleop_config)
except KeyboardInterrupt:
logging.info("Leader node stopped by user")
finally:
if robot.is_connected:
robot.disconnect()
if __name__ == "__main__":
run_leader()