import math import torch import numpy as np import os import time import cv2 import glob import pickle import copy import queue from queue import Queue from threading import Thread, Event from io import BytesIO import soundfile as sf from ttsreal import EdgeTTS,VoitsTTS,XTTS from tqdm import tqdm def read_imgs(img_list): frames = [] print('reading images...') for img_path in tqdm(img_list): frame = cv2.imread(img_path) frames.append(frame) return frames class BaseReal: def __init__(self, opt): self.opt = opt self.sample_rate = 16000 self.chunk = self.sample_rate // opt.fps # 320 samples per chunk (20ms * 16000 / 1000) if opt.tts == "edgetts": self.tts = EdgeTTS(opt,self) elif opt.tts == "gpt-sovits": self.tts = VoitsTTS(opt,self) elif opt.tts == "xtts": self.tts = XTTS(opt,self) self.curr_state=0 self.custom_img_cycle = {} self.custom_audio_cycle = {} self.custom_audio_index = {} self.custom_index = {} self.custom_opt = {} self.__loadcustom() def __loadcustom(self): for item in self.opt.customopt: print(item) input_img_list = glob.glob(os.path.join(item['imgpath'], '*.[jpJP][pnPN]*[gG]')) input_img_list = sorted(input_img_list, key=lambda x: int(os.path.splitext(os.path.basename(x))[0])) self.custom_img_cycle[item['audiotype']] = read_imgs(input_img_list) self.custom_audio_cycle[item['audiotype']], sample_rate = sf.read(item['audiopath'], dtype='float32') self.custom_audio_index[item['audiotype']] = 0 self.custom_index[item['audiotype']] = 0 self.custom_opt[item['audiotype']] = item def init_customindex(self): self.curr_state=0 for key in self.custom_audio_index: self.custom_audio_index[key]=0 for key in self.custom_index: self.custom_index[key]=0 def mirror_index(self,size, index): #size = len(self.coord_list_cycle) turn = index // size res = index % size if turn % 2 == 0: return res else: return size - res - 1 def get_audio_stream(self,audiotype): idx = self.custom_audio_index[audiotype] stream = self.custom_audio_cycle[audiotype][idx:idx+self.chunk] self.custom_audio_index[audiotype] += self.chunk if self.custom_audio_index[audiotype]>=self.custom_audio_cycle[audiotype].shape[0]: self.curr_state = 1 #当前视频不循环播放,切换到静音状态 return stream def set_curr_state(self,audiotype, reinit): print('set_curr_state:',audiotype) self.curr_state = audiotype if reinit: self.custom_audio_index[audiotype] = 0 self.custom_index[audiotype] = 0 # def process_custom(self,audiotype:int,idx:int): # if self.curr_state!=audiotype: #从推理切到口播 # if idx in self.switch_pos: #在卡点位置可以切换 # self.curr_state=audiotype # self.custom_index=0 # else: # self.custom_index+=1