Adding audio frames reading capability

This commit is contained in:
CarolinePascal 2025-04-03 11:41:05 +02:00
parent e4eebd0680
commit 058478a74d
No known key found for this signature in database
3 changed files with 111 additions and 37 deletions

View File

@ -249,9 +249,14 @@ def control_loop(
timestamp = 0
start_episode_t = time.perf_counter()
if teleoperate and dataset is not None:
if dataset is not None:
for microphone_key, microphone in robot.microphones.items():
#Start recording both in file writing and data reading mode
dataset.add_microphone_recording(microphone, microphone_key)
else:
for _, microphone in robot.microphones.items():
# Start recording only in data reading mode
microphone.start_recording()
while timestamp < control_time_s:
start_loop_t = time.perf_counter()
@ -271,7 +276,9 @@ def control_loop(
action = {"action": action}
if dataset is not None:
frame = {**observation, **action, "task": single_task}
#Remove audio frames which are directly written in a dedicated file
audioless_observation = {key: observation[key] for key in observation if key not in robot.microphones}
frame = {**audioless_observation, **action, "task": single_task}
dataset.add_frame(frame)
# TODO(Steven): This should be more general (for RemoteRobot instead of checking the name, but anyways it will change soon)

View File

@ -119,16 +119,13 @@ class Microphone:
config = MicrophoneConfig(microphone_index=0, sampling_rate=16000, channels=[1], data_type="int16")
microphone = Microphone(config)
microphone.connect()
microphone.start_recording("some/output/file.wav")
...
microphone.stop_recording()
#OR
microphone.start_recording()
audio_readings = microphone.read() #Gets all recorded audio data since the last read or since the beginning of the recording
...
microphone.stop_recording()
last_recorded_audio_chunk = microphone.queue.get()
microphone.disconnect()
```
"""
@ -145,12 +142,16 @@ class Microphone:
#Input audio stream
self.stream = None
#Thread-safe concurrent queue to store the recorded audio
self.queue = Queue()
self.thread = None
self.stop_event = None
self.logs = {}
#Thread-safe concurrent queue to store the recorded/read audio
self.record_queue = Queue()
self.read_queue = Queue()
#Thread to handle data reading and file writing in a separate thread (safely)
self.record_thread = None
self.record_stop_event = None
self.logs = {}
self.is_connected = False
def connect(self) -> None:
@ -213,53 +214,101 @@ class Microphone:
def _audio_callback(self, indata, frames, time, status) -> None :
if status:
logging.warning(status)
#slicing makes copy unecessary
self.queue.put(indata[:,self.channels])
# Slicing makes copy unecessary
# Two separate queues are necessary because .get() also pops the data from the queue
self.record_queue.put(indata[:,self.channels])
self.read_queue.put(indata[:,self.channels])
def _read_write_loop(self, output_file : Path) -> None:
output_file = Path(output_file)
if output_file.exists():
shutil.rmtree(
output_file,
)
def _record_loop(self, output_file: Path) -> None:
with sf.SoundFile(output_file, mode='x', samplerate=self.sampling_rate,
channels=max(self.channels)+1, subtype=sf.default_subtype(output_file.suffix[1:])) as file:
while not self.stop_event.is_set():
file.write(self.queue.get())
while not self.record_stop_event.is_set():
file.write(self.record_queue.get())
#self.record_queue.task_done()
def _read(self) -> np.ndarray:
"""
Gets audio data from the queue and coverts it to a numpy array.
-> PROS : Inherently thread safe, no need to lock the queue, lightweight CPU usage
-> CONS : Reading duration does not scale well with the number of channels and reading duration
"""
try:
audio_readings = self.read_queue.queue
except Queue.Empty:
audio_readings = np.empty((0, len(self.channels)))
else:
#TODO(CarolinePascal): Check if this is the fastest way to do it
self.read_queue = Queue()
with self.read_queue.mutex:
self.read_queue.queue.clear()
#self.read_queue.all_tasks_done.notify_all()
audio_readings = np.array(audio_readings).reshape(-1, len(self.channels))
return audio_readings
def read(self) -> np.ndarray:
if not self.is_connected:
raise RobotDeviceNotConnectedError(f"Microphone {self.microphone_index} is not connected.")
if not self.stream.active:
raise RuntimeError(f"Microphone {self.microphone_index} is not recording.")
start_time = time.perf_counter()
audio_readings = self._read()
# log the number of seconds it took to read the audio chunk
self.logs["delta_timestamp_s"] = time.perf_counter() - start_time
# log the utc time at which the audio chunk was received
self.logs["timestamp_utc"] = capture_timestamp_utc()
return audio_readings
def start_recording(self, output_file : str | None = None) -> None:
if not self.is_connected:
raise RobotDeviceNotConnectedError(f"Microphone {self.microphone_index} is not connected.")
self.read_queue = Queue()
with self.read_queue.mutex:
self.read_queue.queue.clear()
#self.read_queue.all_tasks_done.notify_all()
self.record_queue = Queue()
with self.record_queue.mutex:
self.record_queue.queue.clear()
#self.record_queue.all_tasks_done.notify_all()
#Recording case
if output_file is not None:
self.stop_event = Event()
self.thread = Thread(target=self._read_write_loop, args=(output_file,))
self.thread.daemon = True
self.thread.start()
output_file = Path(output_file)
if output_file.exists():
output_file.unlink()
self.record_stop_event = Event()
self.record_thread = Thread(target=self._record_loop, args=(output_file,))
self.record_thread.daemon = True
self.record_thread.start()
self.logs["start_timestamp"] = capture_timestamp_utc()
self.stream.start()
def stop_recording(self) -> None:
if not self.is_connected:
raise RobotDeviceNotConnectedError(f"Microphone {self.microphone_index} is not connected.")
self.logs["stop_timestamp"] = capture_timestamp_utc()
if self.thread is not None:
self.stop_event.set()
self.thread.join()
self.thread = None
self.stop_event = None
if self.record_thread is not None:
#self.record_queue.join()
self.record_stop_event.set()
self.record_thread.join()
self.record_thread = None
self.record_stop_event = None
if self.stream.active:
self.stream.stop() #Wait for all buffers to be processed
#Remark : stream.abort() flushes the buffers !
self.logs["duration"] = self.logs["stop_timestamp"] - self.logs["start_timestamp"]
def disconnect(self) -> None:
if not self.is_connected:

View File

@ -541,6 +541,15 @@ class ManipulatorRobot:
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
# Capture audio from microphones
audio = {}
for name in self.microphones:
before_audioread_t = time.perf_counter()
audio[name] = self.microphones[name].read()
audio[name] = torch.from_numpy(audio[name])
self.logs[f"read_microphone_{name}_dt_s"] = self.microphones[name].logs["delta_timestamp_s"]
self.logs[f"async_read_microphone_{name}_dt_s"] = time.perf_counter() - before_audioread_t
# Populate output dictionaries
obs_dict, action_dict = {}, {}
obs_dict["observation.state"] = state
@ -581,6 +590,15 @@ class ManipulatorRobot:
self.logs[f"read_camera_{name}_dt_s"] = self.cameras[name].logs["delta_timestamp_s"]
self.logs[f"async_read_camera_{name}_dt_s"] = time.perf_counter() - before_camread_t
# Capture audio from microphones
audio = {}
for name in self.microphones:
before_audioread_t = time.perf_counter()
audio[name] = self.microphones[name].read()
audio[name] = torch.from_numpy(audio[name])
self.logs[f"read_microphone_{name}_dt_s"] = self.microphones[name].logs["delta_timestamp_s"]
self.logs[f"async_read_microphone_{name}_dt_s"] = time.perf_counter() - before_audioread_t
# Populate output dictionaries and format to pytorch
obs_dict = {}
obs_dict["observation.state"] = state