Skip to content

Commit

Permalink
Fix resync issues not sorting the video packets correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
rocky4546 committed Dec 12, 2023
1 parent 453f49c commit f3ad6ec
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 24 deletions.
2 changes: 0 additions & 2 deletions lib/common/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,13 @@ def wrapper_func(self, *args, **kwargs):
ex_save = None
# arg0 = uri, arg1=retries
if len(args) == 0:
self.logger.warning('get uri called with no args f:{}'.format(f))
arg0 = 'None'
retries = 2
elif len(args) == 1:
arg0 = args[0]
retries = 2
else:
arg0 = args[0]
self.logger.warning('get uri called from {} with retires={}'.format(f, args[1]))
retries = args[1]
i = retries
is_done = 0
Expand Down
2 changes: 1 addition & 1 deletion lib/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

import lib.common.exceptions as exceptions

VERSION = '0.9.14.00-RC06'
VERSION = '0.9.14.00-RC07'
CABERNET_URL = 'https://github.com/cabernetwork/cabernet'
CABERNET_ID = 'cabernet'
CABERNET_REPO = 'manifest.json'
Expand Down
50 changes: 29 additions & 21 deletions lib/streams/m3u8_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
IN_QUEUE = Queue()
OUT_QUEUE = Queue()
TERMINATE_REQUESTED = False
MAX_STREAM_QUEUE_SIZE = 40
MAX_STREAM_QUEUE_SIZE = 20
STREAM_QUEUE = Queue()
OUT_QUEUE_LIST = []
HTTP_TIMEOUT=8
Expand All @@ -59,6 +59,7 @@

class M3U8GetUriData(Thread):
def __init__(self, _queue_item, _uid_counter, _config):
global TERMINATE_REQUESTED
Thread.__init__(self)
self.queue_item = _queue_item
self.uid_counter = _uid_counter
Expand All @@ -68,25 +69,27 @@ def __init__(self, _queue_item, _uid_counter, _config):
self.pts_validation = None
if _config[M3U8Queue.config_section]['player-enable_pts_filter']:
self.pts_validation = PTSValidation(_config, M3U8Queue.channel_dict)
self.start()
if not TERMINATE_REQUESTED:
self.start()

def run(self):
global UID_COUNTER
global UID_PROCESSED
global STREAM_QUEUE
self.logger.debug('M3U8GetUriData started {} {} {}'.format(self.queue_item['data']['uri'], os.getpid(), threading.get_ident()))
global TERMINATE_REQUESTED
self.logger.trace('M3U8GetUriData started {} {} {}'.format(self.queue_item['data']['uri'], os.getpid(), threading.get_ident()))
m3u8_data = self.process_m3u8_item(self.queue_item)
PROCESSED_URLS[self.uid_counter] = m3u8_data
STREAM_QUEUE.put({'uri_dt': 'check_processed_list'})
self.logger.debug('M3U8GetUriData terminated COUNTER {} {} {}'.format(self.uid_counter, os.getpid(), threading.get_ident()))
if not TERMINATE_REQUESTED:
PROCESSED_URLS[self.uid_counter] = m3u8_data
STREAM_QUEUE.put({'uri_dt': 'check_processed_list'})
self.logger.trace('M3U8GetUriData terminated COUNTER {} {} {}'.format(self.uid_counter, os.getpid(), threading.get_ident()))
m3u8_data = None
self.queue_item = None
self.uid_counter = None
self.video = None
self.pts_validation = None
self.logger = None



@handle_url_except()
def get_uri_data(self, _uri, _retries):
Expand Down Expand Up @@ -236,13 +239,6 @@ def process_m3u8_item(self, _queue_item):
'atsc': None
}

M3U8Queue.pts_resync.resequence_pts(self.video)
if self.video.data is None:
PLAY_LIST[uri_dt]['played'] = True
return{'uri': data['uri'],
'data': data,
'stream': self.video.data,
'atsc': None}
atsc_default_msg = self.atsc_processing()
PLAY_LIST[uri_dt]['played'] = True
if self.uid_counter > UID_PROCESSED+1:
Expand Down Expand Up @@ -277,6 +273,8 @@ class M3U8Queue(Thread):

def __init__(self, _config, _channel_dict):
Thread.__init__(self)
self.video = Video(_config)
self.q_action = None
self.logger = logging.getLogger(__name__ + str(threading.get_ident()))
self.config = _config
self.namespace = _channel_dict['namespace'].lower()
Expand Down Expand Up @@ -305,9 +303,11 @@ def run(self):
global UID_COUNTER
global UID_PROCESSED
global PARALLEL_DOWNLOADS
global PROCESSED_URLS
try:
while not TERMINATE_REQUESTED:
queue_item = STREAM_QUEUE.get()
self.q_action = queue_item['uri_dt']
if queue_item['uri_dt'] == 'terminate':
self.logger.debug('Received terminate from internalproxy {}'.format(os.getpid()))
TERMINATE_REQUESTED = True
Expand All @@ -320,22 +320,24 @@ def run(self):
'atsc': None})
continue
elif queue_item['uri_dt'] == 'check_processed_list':
self.logger.warning('#### Received check_processed_list {} COUNTER: {} PROCESSED: {} PROCESSED_Q: {} QUEUE: {}'.format(os.getpid(), UID_COUNTER, UID_PROCESSED, len(PROCESSED_URLS), STREAM_QUEUE.qsize()))
self.logger.debug('#### Received check_processed_list request {} Received: {} Processed: {} Processed_Queue: {} Incoming_Queue: {}'
.format(os.getpid(), UID_COUNTER, UID_PROCESSED, len(PROCESSED_URLS), STREAM_QUEUE.qsize()))
self.check_processed_list()
time.sleep(.1)
continue

self.logger.warning('**** Received check_processed_list {} COUNTER: {} PROCESSED: {} PROCESSED_Q: {}'.format(os.getpid(), UID_COUNTER, UID_PROCESSED, len(PROCESSED_URLS)))
self.logger.debug('**** Running check_processed_list {} Received: {} Processed: {} Processed_Queue: {} Incoming_Queue: {}'
.format(os.getpid(), UID_COUNTER, UID_PROCESSED, len(PROCESSED_URLS), STREAM_QUEUE.qsize()))
self.check_processed_list()
while UID_COUNTER - UID_PROCESSED - len(PROCESSED_URLS) > PARALLEL_DOWNLOADS+1:
self.logger.warning('SLOWING PROCESSING: {} PROCESSED: {} PROCESSED_Q: {} QUEUE: {}'.format(UID_COUNTER, UID_PROCESSED, len(PROCESSED_URLS), STREAM_QUEUE.qsize()))
self.logger.debug('Slowed Processing: {} Received: {} Processed: {} Processed_Queue: {} Incoming_Queue: {}'
.format(os.getpid(), UID_COUNTER, UID_PROCESSED, len(PROCESSED_URLS), STREAM_QUEUE.qsize()))
time.sleep(.5)
self.check_processed_list()
time.sleep(.1)
if TERMINATE_REQUESTED:
break
self.process_queue = M3U8GetUriData(queue_item, UID_COUNTER, self.config)
time.sleep(.1)

self.logger.warning('thread running for {} COUNTER: {} PROCESSED: {} PROCESSED_Q: {}'.format(os.getpid(), UID_COUNTER, UID_PROCESSED, len(PROCESSED_URLS)))
UID_COUNTER += 1
except (KeyboardInterrupt, EOFError) as ex:
TERMINATE_REQUESTED = True
Expand Down Expand Up @@ -366,6 +368,7 @@ def run(self):
'data': None,
'stream': None,
'atsc': None})
PROCESSED_URLS.clear()
time.sleep(0.01)
TERMINATE_REQUESTED = True
self.logger.debug('M3U8Queue terminated {}'.format(os.getpid()))
Expand All @@ -376,8 +379,13 @@ def check_processed_list(self):
global PROCESSED_URLS
if len(PROCESSED_URLS) > 0:
first_key = sorted(PROCESSED_URLS.keys())[0]
self.logger.warning('FIRST KEY={} UID_PROCESSED={}'.format(first_key, UID_PROCESSED))
if first_key == UID_PROCESSED:
self.video.data = PROCESSED_URLS[first_key]['stream']
M3U8Queue.pts_resync.resequence_pts(self.video)
if self.video.data is None and self.q_action != 'check_processed_list':
PLAY_LIST[self.q_action]['played'] = True
PROCESSED_URLS[first_key]['stream'] = self.video.data

out_queue_put(PROCESSED_URLS[first_key])
del PROCESSED_URLS[first_key]
UID_PROCESSED += 1
Expand Down

0 comments on commit f3ad6ec

Please sign in to comment.