Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

我参考作者的实现方式自己实现了GaussianTalker接入进来,为什么asyncio.Queue会一直阻塞住呢?各位懂异步编程的可以帮帮我吗 #264

Open
shehuiwojiege opened this issue Sep 22, 2024 · 1 comment

Comments

@shehuiwojiege
Copy link

我参考作者的实现思路,实现了Player和Tracker,下面是我的简单demo,但我就是不明白为什么在Tracker的recv那里,异步Queue被阻塞了,希望各位帮我看下,不胜感谢:
'''python
import time, av
import aiohttp
import logging
import uvicorn
import asyncio
import threading
import multiprocessing
from aiortc import RTCPeerConnection, RTCSessionDescription
import fractions, asyncio
from typing import (
Set,
Tuple,
Union,
Optional,
)
from loguru import logger
from av.frame import Frame
from av.packet import Packet
from aiortc import MediaStreamTrack
AUDIO_PTIME = 0.020 # 20ms audio packetization
VIDEO_CLOCK_RATE = 90000
VIDEO_PTIME = 1 / 25 # 30fps
VIDEO_TIME_BASE = fractions.Fraction(1, VIDEO_CLOCK_RATE)
SAMPLE_RATE = 48000
AUDIO_TIME_BASE = fractions.Fraction(1, SAMPLE_RATE)

def player_worker_thread(
quit_event,
loop,
linker,
audio_track,
video_track
):
linker.render_forever(quit_event, loop, audio_track, video_track)

class PlayerStreamTrack(MediaStreamTrack):

def __init__(self, player, kind):
    super().__init__()
    self.kind = kind
    self._player = player
    self._queue = asyncio.Queue()

_start: float
_timestamp: int

async def next_timestamp(self) -> Tuple[int, fractions.Fraction]:
    if self.readyState != 'live':
        raise Exception

    if self.kind == 'video':
        if hasattr(self, '_timestamp'):
            self._timestamp += int(VIDEO_PTIME * VIDEO_CLOCK_RATE)
            wait = self._start + (self._timestamp / VIDEO_CLOCK_RATE)
            if wait > 0:
                await asyncio.sleep(wait)
        else:
            self._start = time.time()
            self._timestamp = 0
            # print('video start:', self._start)
        return self._timestamp, VIDEO_TIME_BASE
    else:
        if hasattr(self, '_timestamp'):
            self._timestamp += int(AUDIO_PTIME * SAMPLE_RATE)
            wait = self._start + (self._timestamp / SAMPLE_RATE) - time.time()
            if wait > 0:
                await asyncio.sleep(wait)
        else:
            self._start = time.time()
            self._timestamp = 0
            # print('audio start:', self._start)
        return self._timestamp, VIDEO_TIME_BASE

async def recv(self) -> Union[Frame, Packet]:
    self._player._start(self)
    frame = await self._queue.get()
    print(11111, self.kind, frame)
    if frame is None:
        self.stop()
        raise Exception
    pts, time_base = await self.next_timestamp()
    frame.pts = pts
    frame.time_base = time_base
    print(self.kind, self._queue.qsize())
    return frame

def stop(self):
    super().stop()
    if self._player is not None:
        self._player._stop(self)
        self._player = None

class HumanPlayer:

def __init__(self, linker):
    self.__thread: Optional[threading.Thread] = None
    self.__thread_quit: Optional[threading.Event] = None
    self.__started: Set[PlayerStreamTrack] = set()
    self.__audio: Optional[PlayerStreamTrack] = None
    self.__video: Optional[PlayerStreamTrack] = None
    self.__audio = PlayerStreamTrack(self, 'audio')
    self.__video = PlayerStreamTrack(self, 'video')
    self.__linker = linker

@property
def audio(self) -> MediaStreamTrack:
    return self.__audio

@property
def video(self) -> MediaStreamTrack:
    return self.__video

def _start(self, track: PlayerStreamTrack) -> None:
    self.__started.add(track)
    if self.__thread is None:
        self.__log_debug('Starting worker thread')
        self.__thread_quit = threading.Event()
        self.__thread = threading.Thread(
            name='media-player',
            target=player_worker_thread,
            args=(
                self.__thread_quit,
                asyncio.new_event_loop(),
                self.__linker,
                self.__audio,
                self.__video
            )
        )
        self.__thread.start()
        time.sleep(2)

def _stop(self, track: PlayerStreamTrack) -> None:
    self.__started.discard(track)

    if not self.__started and self.__thread is not None:
        self.__log_debug("Stopping worker thread")
        self.__thread_quit.set()
        self.__thread.join()
        self.__thread = None

    if not self.__started and self.__linker is not None:
        self.__linker = None

def __log_debug(self, msg: str, *args) -> None:
    logger.debug(f'HumanPlayer {msg}', *args)

class GaussianTalkerLinker:

def __init__(self):
    con = av.open('1.mov')
    self.audio_generator = [f for f in con.decode(audio=0)]
    self.audio_idx = 0
    con = av.open('1.mov')
    self.video_generator = [f for f in con.decode(video=0)]
    self.video_idx = 0

def get_audio(self):
    if self.audio_idx >= len(self.audio_generator):
        self.audio_idx = 0
    frame = self.audio_generator[self.audio_idx]
    self.audio_idx += 1
    return frame

def get_video(self):
    if self.video_idx >= len(self.video_generator):
        self.video_idx = 0
    frame = self.video_generator[self.video_idx]
    self.video_idx += 1
    return frame

def render_forever(
        self,
        quit_event,
        loop=None,
        audio_track: PlayerStreamTrack = None,
        video_track: PlayerStreamTrack = None):
    while not quit_event.is_set():
        self.run_step(loop, audio_track, video_track)
        if video_track._queue.qsize() >= 5:
            time.sleep(0.04*video_track._queue.qsize()*0.8)
    print('linker thread stop')

def run_step(self, loop=None, audio_track=None, video_track=None):
    for i in range(2):
        new_frame = self.get_audio()
        asyncio.run_coroutine_threadsafe(audio_track._queue.put(new_frame), loop)
    # 模型推理
    new_frame = self.get_video()
    asyncio.run_coroutine_threadsafe(video_track._queue.put(new_frame), loop)

async def post(url,data):
try:
async with aiohttp.ClientSession() as session:
async with session.post(url,data=data) as response:
return await response.text()
except aiohttp.ClientError as e:
print(f'Error: {e}')

pcs = set()

async def run(push_url):
pc = RTCPeerConnection()
pcs.add(pc)
@pc.on("connectionstatechange")
async def on_connectionstatechange():
print("Connection state is %s" % pc.connectionState)
if pc.connectionState == "failed":
await pc.close()
pcs.discard(pc)
player = HumanPlayer(linker)
audio_sender = pc.addTrack(player.audio)
video_sender = pc.addTrack(player.video)
await pc.setLocalDescription(await pc.createOffer())
answer = await post(push_url, pc.localDescription.sdp)
await pc.setRemoteDescription(RTCSessionDescription(sdp=answer, type='answer'))

if name == 'main':
linker = GaussianTalkerLinker()
multiprocessing.set_start_method('spawn')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run('http://192.168.31.19:1985/rtc/v1/whip/?app=live&stream=livestream'))
loop.run_forever()
'''

@shehuiwojiege
Copy link
Author

异步编程实在搞不定呀,它们不都是一个loop的吗,为什么在recv那里的queue.get会阻塞住呀?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant