159 lines
4.7 KiB
Python
159 lines
4.7 KiB
Python
|
|
||
|
import asyncio
|
||
|
import json
|
||
|
import logging
|
||
|
import threading
|
||
|
import time
|
||
|
from typing import Tuple, Dict, Optional, Set, Union
|
||
|
from av.frame import Frame
|
||
|
from av.packet import Packet
|
||
|
import fractions
|
||
|
|
||
|
AUDIO_PTIME = 0.020 # 20ms audio packetization
|
||
|
VIDEO_CLOCK_RATE = 90000
|
||
|
VIDEO_PTIME = 1 / 25 # 30fps
|
||
|
VIDEO_TIME_BASE = fractions.Fraction(1, VIDEO_CLOCK_RATE)
|
||
|
SAMPLE_RATE = 16000
|
||
|
AUDIO_TIME_BASE = fractions.Fraction(1, SAMPLE_RATE)
|
||
|
|
||
|
#from aiortc.contrib.media import MediaPlayer, MediaRelay
|
||
|
#from aiortc.rtcrtpsender import RTCRtpSender
|
||
|
from aiortc import (
|
||
|
MediaStreamTrack,
|
||
|
)
|
||
|
|
||
|
logging.basicConfig()
|
||
|
logger = logging.getLogger(__name__)
|
||
|
|
||
|
|
||
|
class PlayerStreamTrack(MediaStreamTrack):
|
||
|
"""
|
||
|
A video track that returns an animated flag.
|
||
|
"""
|
||
|
|
||
|
def __init__(self, player, kind):
|
||
|
super().__init__() # don't forget this!
|
||
|
self.kind = kind
|
||
|
self._player = player
|
||
|
self._queue = asyncio.Queue()
|
||
|
|
||
|
_start: float
|
||
|
_timestamp: int
|
||
|
|
||
|
async def next_timestamp(self) -> Tuple[int, fractions.Fraction]:
|
||
|
if self.readyState != "live":
|
||
|
raise Exception
|
||
|
|
||
|
if self.kind == 'video':
|
||
|
if hasattr(self, "_timestamp"):
|
||
|
self._timestamp += int(VIDEO_PTIME * VIDEO_CLOCK_RATE)
|
||
|
wait = self._start + (self._timestamp / VIDEO_CLOCK_RATE) - time.time()
|
||
|
await asyncio.sleep(wait)
|
||
|
else:
|
||
|
self._start = time.time()
|
||
|
self._timestamp = 0
|
||
|
return self._timestamp, VIDEO_TIME_BASE
|
||
|
else: #audio
|
||
|
if hasattr(self, "_timestamp"):
|
||
|
self._timestamp += int(AUDIO_PTIME * SAMPLE_RATE)
|
||
|
wait = self._start + (self._timestamp / SAMPLE_RATE) - time.time()
|
||
|
await asyncio.sleep(wait)
|
||
|
else:
|
||
|
self._start = time.time()
|
||
|
self._timestamp = 0
|
||
|
return self._timestamp, AUDIO_TIME_BASE
|
||
|
|
||
|
async def recv(self) -> Union[Frame, Packet]:
|
||
|
# frame = self.frames[self.counter % 30]
|
||
|
self._player._start(self)
|
||
|
frame = await self._queue.get()
|
||
|
pts, time_base = await self.next_timestamp()
|
||
|
frame.pts = pts
|
||
|
frame.time_base = time_base
|
||
|
if frame is None:
|
||
|
self.stop()
|
||
|
raise Exception
|
||
|
return frame
|
||
|
|
||
|
def stop(self):
|
||
|
super().stop()
|
||
|
if self._player is not None:
|
||
|
self._player._stop(self)
|
||
|
self._player = None
|
||
|
|
||
|
def player_worker_thread(
|
||
|
quit_event,
|
||
|
loop,
|
||
|
container,
|
||
|
audio_track,
|
||
|
video_track
|
||
|
):
|
||
|
container.render(quit_event,loop,audio_track,video_track)
|
||
|
|
||
|
class HumanPlayer:
|
||
|
|
||
|
def __init__(
|
||
|
self, nerfreal, format=None, options=None, timeout=None, loop=False, decode=True
|
||
|
):
|
||
|
self.__thread: Optional[threading.Thread] = None
|
||
|
self.__thread_quit: Optional[threading.Event] = None
|
||
|
|
||
|
# examine streams
|
||
|
self.__started: Set[PlayerStreamTrack] = set()
|
||
|
self.__audio: Optional[PlayerStreamTrack] = None
|
||
|
self.__video: Optional[PlayerStreamTrack] = None
|
||
|
|
||
|
self.__audio = PlayerStreamTrack(self, kind="audio")
|
||
|
self.__video = PlayerStreamTrack(self, kind="video")
|
||
|
|
||
|
self.__container = nerfreal
|
||
|
|
||
|
|
||
|
@property
|
||
|
def audio(self) -> MediaStreamTrack:
|
||
|
"""
|
||
|
A :class:`aiortc.MediaStreamTrack` instance if the file contains audio.
|
||
|
"""
|
||
|
return self.__audio
|
||
|
|
||
|
@property
|
||
|
def video(self) -> MediaStreamTrack:
|
||
|
"""
|
||
|
A :class:`aiortc.MediaStreamTrack` instance if the file contains video.
|
||
|
"""
|
||
|
return self.__video
|
||
|
|
||
|
def _start(self, track: PlayerStreamTrack) -> None:
|
||
|
self.__started.add(track)
|
||
|
if self.__thread is None:
|
||
|
self.__log_debug("Starting worker thread")
|
||
|
self.__thread_quit = threading.Event()
|
||
|
self.__thread = threading.Thread(
|
||
|
name="media-player",
|
||
|
target=player_worker_thread,
|
||
|
args=(
|
||
|
self.__thread_quit,
|
||
|
asyncio.get_event_loop(),
|
||
|
self.__container,
|
||
|
self.__audio,
|
||
|
self.__video
|
||
|
),
|
||
|
)
|
||
|
self.__thread.start()
|
||
|
|
||
|
def _stop(self, track: PlayerStreamTrack) -> None:
|
||
|
self.__started.discard(track)
|
||
|
|
||
|
if not self.__started and self.__thread is not None:
|
||
|
self.__log_debug("Stopping worker thread")
|
||
|
self.__thread_quit.set()
|
||
|
self.__thread.join()
|
||
|
self.__thread = None
|
||
|
|
||
|
if not self.__started and self.__container is not None:
|
||
|
#self.__container.close()
|
||
|
self.__container = None
|
||
|
|
||
|
def __log_debug(self, msg: str, *args) -> None:
|
||
|
logger.debug(f"HumanPlayer {msg}", *args)
|