diff --git a/lerobot/common/robot_devices/control_utils.py b/lerobot/common/robot_devices/control_utils.py index 335b6430..d54a9e13 100644 --- a/lerobot/common/robot_devices/control_utils.py +++ b/lerobot/common/robot_devices/control_utils.py @@ -249,9 +249,14 @@ def control_loop( timestamp = 0 start_episode_t = time.perf_counter() - if teleoperate and dataset is not None: + if dataset is not None: for microphone_key, microphone in robot.microphones.items(): + #Start recording both in file writing and data reading mode dataset.add_microphone_recording(microphone, microphone_key) + else: + for _, microphone in robot.microphones.items(): + # Start recording only in data reading mode + microphone.start_recording() while timestamp < control_time_s: start_loop_t = time.perf_counter() @@ -271,7 +276,9 @@ def control_loop( action = {"action": action} if dataset is not None: - frame = {**observation, **action, "task": single_task} + #Remove audio frames which are directly written in a dedicated file + audioless_observation = {key: observation[key] for key in observation if key not in robot.microphones} + frame = {**audioless_observation, **action, "task": single_task} dataset.add_frame(frame) # TODO(Steven): This should be more general (for RemoteRobot instead of checking the name, but anyways it will change soon) diff --git a/lerobot/common/robot_devices/microphones/microphone.py b/lerobot/common/robot_devices/microphones/microphone.py index c26d03c0..b751f2b6 100644 --- a/lerobot/common/robot_devices/microphones/microphone.py +++ b/lerobot/common/robot_devices/microphones/microphone.py @@ -119,16 +119,13 @@ class Microphone: config = MicrophoneConfig(microphone_index=0, sampling_rate=16000, channels=[1], data_type="int16") microphone = Microphone(config) + microphone.connect() microphone.start_recording("some/output/file.wav") ... - microphone.stop_recording() - - #OR - - microphone.start_recording() + audio_readings = microphone.read() #Gets all recorded audio data since the last read or since the beginning of the recording ... microphone.stop_recording() - last_recorded_audio_chunk = microphone.queue.get() + microphone.disconnect() ``` """ @@ -145,12 +142,16 @@ class Microphone: #Input audio stream self.stream = None - #Thread-safe concurrent queue to store the recorded audio - self.queue = Queue() - self.thread = None - self.stop_event = None - self.logs = {} + #Thread-safe concurrent queue to store the recorded/read audio + self.record_queue = Queue() + self.read_queue = Queue() + + #Thread to handle data reading and file writing in a separate thread (safely) + self.record_thread = None + self.record_stop_event = None + + self.logs = {} self.is_connected = False def connect(self) -> None: @@ -213,53 +214,101 @@ class Microphone: def _audio_callback(self, indata, frames, time, status) -> None : if status: logging.warning(status) - #slicing makes copy unecessary - self.queue.put(indata[:,self.channels]) + # Slicing makes copy unecessary + # Two separate queues are necessary because .get() also pops the data from the queue + self.record_queue.put(indata[:,self.channels]) + self.read_queue.put(indata[:,self.channels]) - def _read_write_loop(self, output_file : Path) -> None: - output_file = Path(output_file) - if output_file.exists(): - shutil.rmtree( - output_file, - ) + def _record_loop(self, output_file: Path) -> None: with sf.SoundFile(output_file, mode='x', samplerate=self.sampling_rate, channels=max(self.channels)+1, subtype=sf.default_subtype(output_file.suffix[1:])) as file: - while not self.stop_event.is_set(): - file.write(self.queue.get()) + while not self.record_stop_event.is_set(): + file.write(self.record_queue.get()) + #self.record_queue.task_done() + + def _read(self) -> np.ndarray: + """ + Gets audio data from the queue and coverts it to a numpy array. + -> PROS : Inherently thread safe, no need to lock the queue, lightweight CPU usage + -> CONS : Reading duration does not scale well with the number of channels and reading duration + """ + try: + audio_readings = self.read_queue.queue + except Queue.Empty: + audio_readings = np.empty((0, len(self.channels))) + else: + #TODO(CarolinePascal): Check if this is the fastest way to do it + self.read_queue = Queue() + with self.read_queue.mutex: + self.read_queue.queue.clear() + #self.read_queue.all_tasks_done.notify_all() + audio_readings = np.array(audio_readings).reshape(-1, len(self.channels)) + + return audio_readings + + def read(self) -> np.ndarray: + + if not self.is_connected: + raise RobotDeviceNotConnectedError(f"Microphone {self.microphone_index} is not connected.") + if not self.stream.active: + raise RuntimeError(f"Microphone {self.microphone_index} is not recording.") + + start_time = time.perf_counter() + + audio_readings = self._read() + + # log the number of seconds it took to read the audio chunk + self.logs["delta_timestamp_s"] = time.perf_counter() - start_time + + # log the utc time at which the audio chunk was received + self.logs["timestamp_utc"] = capture_timestamp_utc() + + return audio_readings def start_recording(self, output_file : str | None = None) -> None: if not self.is_connected: raise RobotDeviceNotConnectedError(f"Microphone {self.microphone_index} is not connected.") + self.read_queue = Queue() + with self.read_queue.mutex: + self.read_queue.queue.clear() + #self.read_queue.all_tasks_done.notify_all() + + self.record_queue = Queue() + with self.record_queue.mutex: + self.record_queue.queue.clear() + #self.record_queue.all_tasks_done.notify_all() + + #Recording case if output_file is not None: - self.stop_event = Event() - self.thread = Thread(target=self._read_write_loop, args=(output_file,)) - self.thread.daemon = True - self.thread.start() + output_file = Path(output_file) + if output_file.exists(): + output_file.unlink() + + self.record_stop_event = Event() + self.record_thread = Thread(target=self._record_loop, args=(output_file,)) + self.record_thread.daemon = True + self.record_thread.start() - self.logs["start_timestamp"] = capture_timestamp_utc() self.stream.start() def stop_recording(self) -> None: if not self.is_connected: raise RobotDeviceNotConnectedError(f"Microphone {self.microphone_index} is not connected.") - - self.logs["stop_timestamp"] = capture_timestamp_utc() - if self.thread is not None: - self.stop_event.set() - self.thread.join() - self.thread = None - self.stop_event = None + if self.record_thread is not None: + #self.record_queue.join() + self.record_stop_event.set() + self.record_thread.join() + self.record_thread = None + self.record_stop_event = None if self.stream.active: self.stream.stop() #Wait for all buffers to be processed #Remark : stream.abort() flushes the buffers ! - self.logs["duration"] = self.logs["stop_timestamp"] - self.logs["start_timestamp"] - def disconnect(self) -> None: if not self.is_connected: diff --git a/lerobot/common/robot_devices/robots/manipulator.py b/lerobot/common/robot_devices/robots/manipulator.py index afa4006a..b5dc60c8 100644 --- a/lerobot/common/robot_devices/robots/manipulator.py +++ b/lerobot/common/robot_devices/robots/manipulator.py @@ -541,6 +541,15 @@ class ManipulatorRobot: self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"] self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t + # Capture audio from microphones + audio = {} + for name in self.microphones: + before_audioread_t = time.perf_counter() + audio[name] = self.microphones[name].read() + audio[name] = torch.from_numpy(audio[name]) + self.logs[f"read_microphone_{name}_dt_s"] = self.microphones[name].logs["delta_timestamp_s"] + self.logs[f"async_read_microphone_{name}_dt_s"] = time.perf_counter() - before_audioread_t + # Populate output dictionaries obs_dict, action_dict = {}, {} obs_dict["observation.state"] = state @@ -581,6 +590,15 @@ class ManipulatorRobot: self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"] self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t + # Capture audio from microphones + audio = {} + for name in self.microphones: + before_audioread_t = time.perf_counter() + audio[name] = self.microphones[name].read() + audio[name] = torch.from_numpy(audio[name]) + self.logs[f"read_microphone_{name}_dt_s"] = self.microphones[name].logs["delta_timestamp_s"] + self.logs[f"async_read_microphone_{name}_dt_s"] = time.perf_counter() - before_audioread_t + # Populate output dictionaries and format to pytorch obs_dict = {} obs_dict["observation.state"] = state