livetalking/ernerf/data_utils/deepspeech_features/deepspeech_features.py

276 lines
8.8 KiB
Python

"""
DeepSpeech features processing routines.
NB: Based on VOCA code. See the corresponding license restrictions.
"""
__all__ = ['conv_audios_to_deepspeech']
import numpy as np
import warnings
import resampy
from scipy.io import wavfile
from python_speech_features import mfcc
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()
def conv_audios_to_deepspeech(audios,
out_files,
num_frames_info,
deepspeech_pb_path,
audio_window_size=1,
audio_window_stride=1):
"""
Convert list of audio files into files with DeepSpeech features.
Parameters
----------
audios : list of str or list of None
Paths to input audio files.
out_files : list of str
Paths to output files with DeepSpeech features.
num_frames_info : list of int
List of numbers of frames.
deepspeech_pb_path : str
Path to DeepSpeech 0.1.0 frozen model.
audio_window_size : int, default 16
Audio window size.
audio_window_stride : int, default 1
Audio window stride.
"""
# deepspeech_pb_path="/disk4/keyu/DeepSpeech/deepspeech-0.9.2-models.pbmm"
graph, logits_ph, input_node_ph, input_lengths_ph = prepare_deepspeech_net(
deepspeech_pb_path)
with tf.compat.v1.Session(graph=graph) as sess:
for audio_file_path, out_file_path, num_frames in zip(audios, out_files, num_frames_info):
print(audio_file_path)
print(out_file_path)
audio_sample_rate, audio = wavfile.read(audio_file_path)
if audio.ndim != 1:
warnings.warn(
"Audio has multiple channels, the first channel is used")
audio = audio[:, 0]
ds_features = pure_conv_audio_to_deepspeech(
audio=audio,
audio_sample_rate=audio_sample_rate,
audio_window_size=audio_window_size,
audio_window_stride=audio_window_stride,
num_frames=num_frames,
net_fn=lambda x: sess.run(
logits_ph,
feed_dict={
input_node_ph: x[np.newaxis, ...],
input_lengths_ph: [x.shape[0]]}))
net_output = ds_features.reshape(-1, 29)
win_size = 16
zero_pad = np.zeros((int(win_size / 2), net_output.shape[1]))
net_output = np.concatenate(
(zero_pad, net_output, zero_pad), axis=0)
windows = []
for window_index in range(0, net_output.shape[0] - win_size, 2):
windows.append(
net_output[window_index:window_index + win_size])
print(np.array(windows).shape)
np.save(out_file_path, np.array(windows))
def prepare_deepspeech_net(deepspeech_pb_path):
"""
Load and prepare DeepSpeech network.
Parameters
----------
deepspeech_pb_path : str
Path to DeepSpeech 0.1.0 frozen model.
Returns
-------
graph : obj
ThensorFlow graph.
logits_ph : obj
ThensorFlow placeholder for `logits`.
input_node_ph : obj
ThensorFlow placeholder for `input_node`.
input_lengths_ph : obj
ThensorFlow placeholder for `input_lengths`.
"""
# Load graph and place_holders:
with tf.io.gfile.GFile(deepspeech_pb_path, "rb") as f:
graph_def = tf.compat.v1.GraphDef()
graph_def.ParseFromString(f.read())
graph = tf.compat.v1.get_default_graph()
tf.import_graph_def(graph_def, name="deepspeech")
logits_ph = graph.get_tensor_by_name("deepspeech/logits:0")
input_node_ph = graph.get_tensor_by_name("deepspeech/input_node:0")
input_lengths_ph = graph.get_tensor_by_name("deepspeech/input_lengths:0")
return graph, logits_ph, input_node_ph, input_lengths_ph
def pure_conv_audio_to_deepspeech(audio,
audio_sample_rate,
audio_window_size,
audio_window_stride,
num_frames,
net_fn):
"""
Core routine for converting audion into DeepSpeech features.
Parameters
----------
audio : np.array
Audio data.
audio_sample_rate : int
Audio sample rate.
audio_window_size : int
Audio window size.
audio_window_stride : int
Audio window stride.
num_frames : int or None
Numbers of frames.
net_fn : func
Function for DeepSpeech model call.
Returns
-------
np.array
DeepSpeech features.
"""
target_sample_rate = 16000
if audio_sample_rate != target_sample_rate:
resampled_audio = resampy.resample(
x=audio.astype(np.float),
sr_orig=audio_sample_rate,
sr_new=target_sample_rate)
else:
resampled_audio = audio.astype(np.float)
input_vector = conv_audio_to_deepspeech_input_vector(
audio=resampled_audio.astype(np.int16),
sample_rate=target_sample_rate,
num_cepstrum=26,
num_context=9)
network_output = net_fn(input_vector)
# print(network_output.shape)
deepspeech_fps = 50
video_fps = 50 # Change this option if video fps is different
audio_len_s = float(audio.shape[0]) / audio_sample_rate
if num_frames is None:
num_frames = int(round(audio_len_s * video_fps))
else:
video_fps = num_frames / audio_len_s
network_output = interpolate_features(
features=network_output[:, 0],
input_rate=deepspeech_fps,
output_rate=video_fps,
output_len=num_frames)
# Make windows:
zero_pad = np.zeros((int(audio_window_size / 2), network_output.shape[1]))
network_output = np.concatenate(
(zero_pad, network_output, zero_pad), axis=0)
windows = []
for window_index in range(0, network_output.shape[0] - audio_window_size, audio_window_stride):
windows.append(
network_output[window_index:window_index + audio_window_size])
return np.array(windows)
def conv_audio_to_deepspeech_input_vector(audio,
sample_rate,
num_cepstrum,
num_context):
"""
Convert audio raw data into DeepSpeech input vector.
Parameters
----------
audio : np.array
Audio data.
audio_sample_rate : int
Audio sample rate.
num_cepstrum : int
Number of cepstrum.
num_context : int
Number of context.
Returns
-------
np.array
DeepSpeech input vector.
"""
# Get mfcc coefficients:
features = mfcc(
signal=audio,
samplerate=sample_rate,
numcep=num_cepstrum)
# We only keep every second feature (BiRNN stride = 2):
features = features[::2]
# One stride per time step in the input:
num_strides = len(features)
# Add empty initial and final contexts:
empty_context = np.zeros((num_context, num_cepstrum), dtype=features.dtype)
features = np.concatenate((empty_context, features, empty_context))
# Create a view into the array with overlapping strides of size
# numcontext (past) + 1 (present) + numcontext (future):
window_size = 2 * num_context + 1
train_inputs = np.lib.stride_tricks.as_strided(
features,
shape=(num_strides, window_size, num_cepstrum),
strides=(features.strides[0],
features.strides[0], features.strides[1]),
writeable=False)
# Flatten the second and third dimensions:
train_inputs = np.reshape(train_inputs, [num_strides, -1])
train_inputs = np.copy(train_inputs)
train_inputs = (train_inputs - np.mean(train_inputs)) / \
np.std(train_inputs)
return train_inputs
def interpolate_features(features,
input_rate,
output_rate,
output_len):
"""
Interpolate DeepSpeech features.
Parameters
----------
features : np.array
DeepSpeech features.
input_rate : int
input rate (FPS).
output_rate : int
Output rate (FPS).
output_len : int
Output data length.
Returns
-------
np.array
Interpolated data.
"""
input_len = features.shape[0]
num_features = features.shape[1]
input_timestamps = np.arange(input_len) / float(input_rate)
output_timestamps = np.arange(output_len) / float(output_rate)
output_features = np.zeros((output_len, num_features))
for feature_idx in range(num_features):
output_features[:, feature_idx] = np.interp(
x=output_timestamps,
xp=input_timestamps,
fp=features[:, feature_idx])
return output_features