import asyncio import json import logging import threading import time from typing import Tuple, Dict, Optional, Set, Union from av.frame import Frame from av.packet import Packet from av import AudioFrame import fractions import numpy as np AUDIO_PTIME = 0.020 # 20ms audio packetization VIDEO_CLOCK_RATE = 90000 VIDEO_PTIME = 1 / 25 # 30fps VIDEO_TIME_BASE = fractions.Fraction(1, VIDEO_CLOCK_RATE) SAMPLE_RATE = 16000 AUDIO_TIME_BASE = fractions.Fraction(1, SAMPLE_RATE) #from aiortc.contrib.media import MediaPlayer, MediaRelay #from aiortc.rtcrtpsender import RTCRtpSender from aiortc import ( MediaStreamTrack, ) logging.basicConfig() logger = logging.getLogger(__name__) class PlayerStreamTrack(MediaStreamTrack): """ A video track that returns an animated flag. """ def __init__(self, player, kind): super().__init__() # don't forget this! self.kind = kind self._player = player self._queue = asyncio.Queue() self.timelist = [] #记录最近包的时间戳 if self.kind == 'video': self.framecount = 0 self.lasttime = time.perf_counter() self.totaltime = 0 _start: float _timestamp: int async def next_timestamp(self) -> Tuple[int, fractions.Fraction]: if self.readyState != "live": raise Exception if self.kind == 'video': if hasattr(self, "_timestamp"): #self._timestamp = (time.time()-self._start) * VIDEO_CLOCK_RATE self._timestamp += int(VIDEO_PTIME * VIDEO_CLOCK_RATE) wait = self._start + (self._timestamp / VIDEO_CLOCK_RATE) - time.time() # wait = self.timelist[0] + len(self.timelist)*VIDEO_PTIME - time.time() if wait>0: await asyncio.sleep(wait) # if len(self.timelist)>=100: # self.timelist.pop(0) # self.timelist.append(time.time()) else: self._start = time.time() self._timestamp = 0 self.timelist.append(self._start) print('video start:',self._start) return self._timestamp, VIDEO_TIME_BASE else: #audio if hasattr(self, "_timestamp"): #self._timestamp = (time.time()-self._start) * SAMPLE_RATE self._timestamp += int(AUDIO_PTIME * SAMPLE_RATE) wait = self._start + (self._timestamp / SAMPLE_RATE) - time.time() # wait = self.timelist[0] + len(self.timelist)*AUDIO_PTIME - time.time() if wait>0: await asyncio.sleep(wait) # if len(self.timelist)>=200: # self.timelist.pop(0) # self.timelist.pop(0) # self.timelist.append(time.time()) else: self._start = time.time() self._timestamp = 0 self.timelist.append(self._start) print('audio start:',self._start) return self._timestamp, AUDIO_TIME_BASE async def recv(self) -> Union[Frame, Packet]: # frame = self.frames[self.counter % 30] self._player._start(self) # if self.kind == 'video': # frame = await self._queue.get() # else: #audio # if hasattr(self, "_timestamp"): # wait = self._start + self._timestamp / SAMPLE_RATE + AUDIO_PTIME - time.time() # if wait>0: # await asyncio.sleep(wait) # if self._queue.qsize()<1: # #frame = AudioFrame(format='s16', layout='mono', samples=320) # audio = np.zeros((1, 320), dtype=np.int16) # frame = AudioFrame.from_ndarray(audio, layout='mono', format='s16') # frame.sample_rate=16000 # else: # frame = await self._queue.get() # else: # frame = await self._queue.get() frame = await self._queue.get() pts, time_base = await self.next_timestamp() frame.pts = pts frame.time_base = time_base if frame is None: self.stop() raise Exception if self.kind == 'video': self.totaltime += (time.perf_counter() - self.lasttime) self.framecount += 1 self.lasttime = time.perf_counter() if self.framecount==100: print(f"------actual avg final fps:{self.framecount/self.totaltime:.4f}") self.framecount = 0 self.totaltime=0 return frame def stop(self): super().stop() if self._player is not None: self._player._stop(self) self._player = None def player_worker_thread( quit_event, loop, container, audio_track, video_track ): container.render(quit_event,loop,audio_track,video_track) class HumanPlayer: def __init__( self, nerfreal, format=None, options=None, timeout=None, loop=False, decode=True ): self.__thread: Optional[threading.Thread] = None self.__thread_quit: Optional[threading.Event] = None # examine streams self.__started: Set[PlayerStreamTrack] = set() self.__audio: Optional[PlayerStreamTrack] = None self.__video: Optional[PlayerStreamTrack] = None self.__audio = PlayerStreamTrack(self, kind="audio") self.__video = PlayerStreamTrack(self, kind="video") self.__container = nerfreal @property def audio(self) -> MediaStreamTrack: """ A :class:`aiortc.MediaStreamTrack` instance if the file contains audio. """ return self.__audio @property def video(self) -> MediaStreamTrack: """ A :class:`aiortc.MediaStreamTrack` instance if the file contains video. """ return self.__video def _start(self, track: PlayerStreamTrack) -> None: self.__started.add(track) if self.__thread is None: self.__log_debug("Starting worker thread") self.__thread_quit = threading.Event() self.__thread = threading.Thread( name="media-player", target=player_worker_thread, args=( self.__thread_quit, asyncio.get_event_loop(), self.__container, self.__audio, self.__video ), ) self.__thread.start() def _stop(self, track: PlayerStreamTrack) -> None: self.__started.discard(track) if not self.__started and self.__thread is not None: self.__log_debug("Stopping worker thread") self.__thread_quit.set() self.__thread.join() self.__thread = None if not self.__started and self.__container is not None: #self.__container.close() self.__container = None def __log_debug(self, msg: str, *args) -> None: logger.debug(f"HumanPlayer {msg}", *args)