From 959ecf9be89272356256d1317ad778a1f16b2249 Mon Sep 17 00:00:00 2001 From: lipku Date: Sat, 5 Oct 2024 17:25:01 +0800 Subject: [PATCH] add llm stream func --- README.md | 2 + app.py | 65 +++++++++++++++++--- requirements.txt | 1 + ttsreal.py | 155 +++++++++++++++++++++++++---------------------- 4 files changed, 141 insertions(+), 82 deletions(-) diff --git a/README.md b/README.md index 79d22fc..c67a7a0 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,8 @@ Real time interactive streaming digital human, realize audio video synchronous [ernerf效果](https://www.bilibili.com/video/BV1PM4m1y7Q2/) [musetalk效果](https://www.bilibili.com/video/BV1gm421N7vQ/) [wav2lip效果](https://www.bilibili.com/video/BV1Bw4m1e74P/) +## 为避免与3d数字人混淆,原项目metahuman-stream改名为livetalking,原有链接地址继续可用 + ## Features 1. 支持多种数字人模型: ernerf、musetalk、wav2lip 2. 支持声音克隆 diff --git a/app.py b/app.py index 5dc4665..8bb3f1d 100644 --- a/app.py +++ b/app.py @@ -24,6 +24,7 @@ import argparse import shutil import asyncio +import string app = Flask(__name__) @@ -52,14 +53,58 @@ def echo_socket(ws): nerfreal.put_msg_txt(message) -def llm_response(message): - from llm.LLM import LLM - # llm = LLM().init_model('Gemini', model_path= 'gemini-pro',api_key='Your API Key', proxy_url=None) - # llm = LLM().init_model('ChatGPT', model_path= 'gpt-3.5-turbo',api_key='Your API Key') - llm = LLM().init_model('VllmGPT', model_path= 'THUDM/chatglm3-6b') - response = llm.chat(message) - print(response) - return response +# def llm_response(message): +# from llm.LLM import LLM +# # llm = LLM().init_model('Gemini', model_path= 'gemini-pro',api_key='Your API Key', proxy_url=None) +# # llm = LLM().init_model('ChatGPT', model_path= 'gpt-3.5-turbo',api_key='Your API Key') +# llm = LLM().init_model('VllmGPT', model_path= 'THUDM/chatglm3-6b') +# response = llm.chat(message) +# print(response) +# return response + +def llm_response(message,nerfreal): + start = time.perf_counter() + from openai import OpenAI + client = OpenAI( + # 如果您没有配置环境变量,请在此处用您的API Key进行替换 + api_key=os.getenv("DASHSCOPE_API_KEY"), + # 填写DashScope SDK的base_url + base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", + ) + end = time.perf_counter() + print(f"llm Time init: {end-start}s") + completion = client.chat.completions.create( + model="qwen-plus", + messages=[{'role': 'system', 'content': 'You are a helpful assistant.'}, + {'role': 'user', 'content': message}], + stream=True, + # 通过以下设置,在流式输出的最后一行展示token使用信息 + stream_options={"include_usage": True} + ) + result="" + first = True + for chunk in completion: + if len(chunk.choices)>0: + #print(chunk.choices[0].delta.content) + if first: + end = time.perf_counter() + print(f"llm Time to first chunk: {end-start}s") + first = False + msg = chunk.choices[0].delta.content + lastpos=0 + #msglist = re.split('[,.!;:,。!?]',msg) + for i, char in enumerate(msg): + if char in ",.!;:,。!?:;" : + result = result+msg[lastpos:i+1] + lastpos = i+1 + if len(result)>10: + print(result) + nerfreal.put_msg_txt(result) + result="" + result = result+msg[lastpos:] + end = time.perf_counter() + print(f"llm Time to last chunk: {end-start}s") + nerfreal.put_msg_txt(result) @sockets.route('/humanchat') def chat_socket(ws): @@ -147,8 +192,8 @@ async def human(request): if params['type']=='echo': nerfreals[sessionid].put_msg_txt(params['text']) elif params['type']=='chat': - res=await asyncio.get_event_loop().run_in_executor(None, llm_response(params['text'])) - nerfreals[sessionid].put_msg_txt(res) + res=await asyncio.get_event_loop().run_in_executor(None, llm_response, params['text'],nerfreals[sessionid]) + #nerfreals[sessionid].put_msg_txt(res) return web.Response( content_type="application/json", diff --git a/requirements.txt b/requirements.txt index b01763e..8f72658 100644 --- a/requirements.txt +++ b/requirements.txt @@ -40,3 +40,4 @@ diffusers accelerate librosa +openai diff --git a/ttsreal.py b/ttsreal.py index cff9fc3..665ff46 100644 --- a/ttsreal.py +++ b/ttsreal.py @@ -37,7 +37,8 @@ class BaseTTS: self.state = State.PAUSE def put_msg_txt(self,msg): - self.msgqueue.put(msg) + if len(msg)>0: + self.msgqueue.put(msg) def render(self,quit_event): process_thread = Thread(target=self.process_tts, args=(quit_event,)) @@ -99,19 +100,22 @@ class EdgeTTS(BaseTTS): return stream async def __main(self,voicename: str, text: str): - communicate = edge_tts.Communicate(text, voicename) + try: + communicate = edge_tts.Communicate(text, voicename) - #with open(OUTPUT_FILE, "wb") as file: - first = True - async for chunk in communicate.stream(): - if first: - first = False - if chunk["type"] == "audio" and self.state==State.RUNNING: - #self.push_audio(chunk["data"]) - self.input_stream.write(chunk["data"]) - #file.write(chunk["data"]) - elif chunk["type"] == "WordBoundary": - pass + #with open(OUTPUT_FILE, "wb") as file: + first = True + async for chunk in communicate.stream(): + if first: + first = False + if chunk["type"] == "audio" and self.state==State.RUNNING: + #self.push_audio(chunk["data"]) + self.input_stream.write(chunk["data"]) + #file.write(chunk["data"]) + elif chunk["type"] == "WordBoundary": + pass + except Exception as e: + print(e) ########################################################################################### class VoitsTTS(BaseTTS): @@ -143,28 +147,31 @@ class VoitsTTS(BaseTTS): # req["emotion"] = emotion # #req["stream_chunk_size"] = stream_chunk_size # you can reduce it to get faster response, but degrade quality # req["streaming_mode"] = True - res = requests.post( - f"{server_url}/tts", - json=req, - stream=True, - ) - end = time.perf_counter() - print(f"gpt_sovits Time to make POST: {end-start}s") + try: + res = requests.post( + f"{server_url}/tts", + json=req, + stream=True, + ) + end = time.perf_counter() + print(f"gpt_sovits Time to make POST: {end-start}s") - if res.status_code != 200: - print("Error:", res.text) - return - - first = True - for chunk in res.iter_content(chunk_size=16000): # 1280 32K*20ms*2 - if first: - end = time.perf_counter() - print(f"gpt_sovits Time to first chunk: {end-start}s") - first = False - if chunk and self.state==State.RUNNING: - yield chunk - - print("gpt_sovits response.elapsed:", res.elapsed) + if res.status_code != 200: + print("Error:", res.text) + return + + first = True + + for chunk in res.iter_content(chunk_size=12800): # 1280 32K*20ms*2 + if first: + end = time.perf_counter() + print(f"gpt_sovits Time to first chunk: {end-start}s") + first = False + if chunk and self.state==State.RUNNING: + yield chunk + #print("gpt_sovits response.elapsed:", res.elapsed) + except Exception as e: + print(e) def stream_tts(self,audio_stream): for chunk in audio_stream: @@ -199,26 +206,28 @@ class CosyVoiceTTS(BaseTTS): 'tts_text': text, 'prompt_text': reftext } - files = [('prompt_wav', ('prompt_wav', open(reffile, 'rb'), 'application/octet-stream'))] - res = requests.request("GET", f"{server_url}/inference_zero_shot", data=payload, files=files, stream=True) - - end = time.perf_counter() - print(f"cosy_voice Time to make POST: {end-start}s") - - if res.status_code != 200: - print("Error:", res.text) - return + try: + files = [('prompt_wav', ('prompt_wav', open(reffile, 'rb'), 'application/octet-stream'))] + res = requests.request("GET", f"{server_url}/inference_zero_shot", data=payload, files=files, stream=True) - first = True - for chunk in res.iter_content(chunk_size=16000): # 1280 32K*20ms*2 - if first: - end = time.perf_counter() - print(f"cosy_voice Time to first chunk: {end-start}s") - first = False - if chunk and self.state==State.RUNNING: - yield chunk + end = time.perf_counter() + print(f"cosy_voice Time to make POST: {end-start}s") - print("cosy_voice response.elapsed:", res.elapsed) + if res.status_code != 200: + print("Error:", res.text) + return + + first = True + + for chunk in res.iter_content(chunk_size=8820): # 882 22.05K*20ms*2 + if first: + end = time.perf_counter() + print(f"cosy_voice Time to first chunk: {end-start}s") + first = False + if chunk and self.state==State.RUNNING: + yield chunk + except Exception as e: + print(e) def stream_tts(self,audio_stream): for chunk in audio_stream: @@ -261,28 +270,30 @@ class XTTS(BaseTTS): speaker["text"] = text speaker["language"] = language speaker["stream_chunk_size"] = stream_chunk_size # you can reduce it to get faster response, but degrade quality - res = requests.post( - f"{server_url}/tts_stream", - json=speaker, - stream=True, - ) - end = time.perf_counter() - print(f"xtts Time to make POST: {end-start}s") + try: + res = requests.post( + f"{server_url}/tts_stream", + json=speaker, + stream=True, + ) + end = time.perf_counter() + print(f"xtts Time to make POST: {end-start}s") - if res.status_code != 200: - print("Error:", res.text) - return + if res.status_code != 200: + print("Error:", res.text) + return - first = True - for chunk in res.iter_content(chunk_size=960): #24K*20ms*2 - if first: - end = time.perf_counter() - print(f"xtts Time to first chunk: {end-start}s") - first = False - if chunk: - yield chunk - - print("xtts response.elapsed:", res.elapsed) + first = True + + for chunk in res.iter_content(chunk_size=9600): #24K*20ms*2 + if first: + end = time.perf_counter() + print(f"xtts Time to first chunk: {end-start}s") + first = False + if chunk: + yield chunk + except Exception as e: + print(e) def stream_tts(self,audio_stream): for chunk in audio_stream: