diff --git a/lib/common/decorators.py b/lib/common/decorators.py index 80ec282..4bdaf51 100644 --- a/lib/common/decorators.py +++ b/lib/common/decorators.py @@ -45,12 +45,19 @@ def handle_url_except(f=None, timeout=None): def wrapper_func(self, *args, **kwargs): ex_save = None + # arg0 = uri, arg1=retries if len(args) == 0: self.logger.warning('get uri called with no args f:{}'.format(f)) arg0 = 'None' + retries = 2 + elif len(args) == 1: + arg0 = args[0] + retries = 2 else: arg0 = args[0] - i = 2 + self.logger.warning('get uri called from {} with retires={}'.format(f, args[1])) + retries = args[1] + i = retries is_done = 0 while i > is_done: i -= 1 diff --git a/lib/common/utils.py b/lib/common/utils.py index 45eddf9..bd8cbed 100644 --- a/lib/common/utils.py +++ b/lib/common/utils.py @@ -37,7 +37,7 @@ import lib.common.exceptions as exceptions -VERSION = '0.9.14.00-RC02' +VERSION = '0.9.14.00-RC03' CABERNET_URL = 'https://github.com/cabernetwork/cabernet' CABERNET_ID = 'cabernet' CABERNET_REPO = 'manifest.json' diff --git a/lib/plugins/plugin_channels.py b/lib/plugins/plugin_channels.py index 3c08529..eed1117 100644 --- a/lib/plugins/plugin_channels.py +++ b/lib/plugins/plugin_channels.py @@ -164,7 +164,7 @@ def clean_group_name(self, group_name): return re.sub('[ +&*%$#@!:;,<>?]', '', group_name) @handle_url_except() - def get_thumbnail_size(self, _thumbnail, _ch_uid, ): + def get_thumbnail_size(self, _thumbnail, _retries, _ch_uid, ): thumbnail_size = (0, 0) if _thumbnail is None or _thumbnail == '': return thumbnail_size @@ -194,7 +194,7 @@ def get_thumbnail_size(self, _thumbnail, _ch_uid, ): return thumbnail_size @handle_url_except - def get_best_stream(self, _url, _channel_id, _referer=None): + def get_best_stream(self, _url, _retries, _channel_id, _referer=None): if self.config_obj.data[self.config_section]['player-stream_type'] == 'm3u8redirect': return _url diff --git a/lib/streams/internal_proxy.py b/lib/streams/internal_proxy.py index 3217fca..1f046f1 100644 --- a/lib/streams/internal_proxy.py +++ b/lib/streams/internal_proxy.py @@ -42,7 +42,7 @@ from .stream import Stream MAX_OUT_QUEUE_SIZE = 30 -IDLE_COUNTER_MAX = 59 # time in seconds beyond any filtered or serving packet to terminate the stream +IDLE_COUNTER_MAX = 140 # four times the timeout * retries to terminate the stream in seconds STARTUP_IDLE_COUNTER = 40 # time to wait for an initial stream # code assumes a timeout response in TVH of 15 or higher. @@ -50,7 +50,6 @@ class InternalProxy(Stream): is_m3u8_starting = 0 - def __init__(self, _plugins, _hdhr_queue): global MAX_OUT_QUEUE_SIZE self.last_refresh = None diff --git a/lib/streams/m3u8_queue.py b/lib/streams/m3u8_queue.py index 14f62c2..4d6bf2a 100644 --- a/lib/streams/m3u8_queue.py +++ b/lib/streams/m3u8_queue.py @@ -74,20 +74,22 @@ def run(self): m3u8_data = self.process_m3u8_item(self.queue_item) PROCESSED_URLS[self.uid_counter] = m3u8_data STREAM_QUEUE.put({'uri_dt': 'check_processed_list'}) + self.logger.debug('M3U8GetUriData terminated COUNTER {} {} {}'.format(self.uid_counter, os.getpid(), threading.get_ident())) m3u8_data = None self.queue_item = None self.uid_counter = None self.video = None self.pts_validation = None - self.logger.debug('M3U8GetUriData terminated {} {}'.format(os.getpid(), threading.get_ident())) self.logger = None @handle_url_except() - def get_uri_data(self, _uri): - self.logger.warning(M3U8Queue.http_header) - resp = M3U8Queue.http_session.get(_uri, headers=M3U8Queue.http_header, timeout=5, follow_redirects=True) + def get_uri_data(self, _uri, _retries): + """ + _retries is used by the decorator when a HTTP failure occurs + """ + resp = M3U8Queue.http_session.get(_uri, headers=M3U8Queue.http_header, timeout=8, follow_redirects=True) x = resp.content resp.raise_for_status() return x @@ -101,7 +103,7 @@ def decrypt_stream(self, _data): self.logger.warning('Unknown protocol, aborting {} {}'.format(os.getpid(), _data['key']['uri'])) return False else: - key_data = self.get_uri_data(_data['key']['uri']) + key_data = self.get_uri_data(_data['key']['uri'], 3) if key_data is not None: M3U8Queue.key_list[_data['key']['uri']] = key_data @@ -171,6 +173,7 @@ def process_m3u8_item(self, _queue_item): global TERMINATE_REQUESTED global PLAY_LIST global OUT_QUEUE + global UID_PROCESSED uri_dt = _queue_item['uri_dt'] data = _queue_item['data'] if data['filtered']: @@ -185,15 +188,16 @@ def process_m3u8_item(self, _queue_item): else: count = 1 while count > 0: - self.video.data = self.get_uri_data(data['uri']) + self.video.data = self.get_uri_data(data['uri'], 3) if self.video.data: break # TBD WHAT TO DO WITH THIS? - out_queue_put({'uri': 'extend', - 'data': data, - 'stream': None, - 'atsc': None}) + if count > 1: + out_queue_put({'uri': 'extend', + 'data': data, + 'stream': None, + 'atsc': None}) count -= 1 if uri_dt not in PLAY_LIST.keys(): @@ -234,6 +238,11 @@ def process_m3u8_item(self, _queue_item): 'atsc': None} atsc_default_msg = self.atsc_processing() PLAY_LIST[uri_dt]['played'] = True + if self.uid_counter > UID_PROCESSED+1: + out_queue_put({'uri': 'extend', + 'data': data, + 'stream': None, + 'atsc': None}) return {'uri': data['uri'], 'data': data, 'stream': self.video.data, @@ -290,10 +299,10 @@ def run(self): try: while not TERMINATE_REQUESTED: queue_item = STREAM_QUEUE.get() - self.logger.warning('QUEUE SIZE: {}'.format(STREAM_QUEUE.qsize())) if queue_item['uri_dt'] == 'terminate': self.logger.debug('Received terminate from internalproxy {}'.format(os.getpid())) TERMINATE_REQUESTED = True + break elif queue_item['uri_dt'] == 'status': out_queue_put({'uri': 'running', @@ -302,21 +311,22 @@ def run(self): 'atsc': None}) continue elif queue_item['uri_dt'] == 'check_processed_list': - self.logger.warning('#### Received check_processed_list {} COUNTER: {} PROCESSED: {}'.format(os.getpid(), UID_COUNTER, UID_PROCESSED)) + self.logger.warning('#### Received check_processed_list {} COUNTER: {} PROCESSED: {} PROCESSED_Q: {} QUEUE: {}'.format(os.getpid(), UID_COUNTER, UID_PROCESSED, len(PROCESSED_URLS), STREAM_QUEUE.qsize())) self.check_processed_list() time.sleep(.1) continue - self.logger.warning('**** Received check_processed_list {} COUNTER: {} PROCESSED: {} QUEUE: {}'.format(os.getpid(), UID_COUNTER, UID_PROCESSED, len(PROCESSED_URLS))) + self.logger.warning('**** Received check_processed_list {} COUNTER: {} PROCESSED: {} PROCESSED_Q: {}'.format(os.getpid(), UID_COUNTER, UID_PROCESSED, len(PROCESSED_URLS))) self.check_processed_list() - while UID_COUNTER - UID_PROCESSED > 4: - self.logger.warning('SLOWING PROCESSING DUE TO BACKUP') - time.sleep(.4) + while UID_COUNTER - UID_PROCESSED - len(PROCESSED_URLS) > 4: + self.logger.warning('SLOWING PROCESSING: {} PROCESSED: {} PROCESSED_Q: {} QUEUE: {}'.format(UID_COUNTER, UID_PROCESSED, len(PROCESSED_URLS), STREAM_QUEUE.qsize())) + time.sleep(.5) self.check_processed_list() + time.sleep(.1) self.process_queue = M3U8GetUriData(queue_item, UID_COUNTER, self.config) time.sleep(.1) - self.logger.warning('thread running for {} COUNTER: {} PROCESSED: {}'.format(os.getpid(), UID_COUNTER, UID_PROCESSED)) + self.logger.warning('thread running for {} COUNTER: {} PROCESSED: {} PROCESSED_Q: {}'.format(os.getpid(), UID_COUNTER, UID_PROCESSED, len(PROCESSED_URLS))) UID_COUNTER += 1 except (KeyboardInterrupt, EOFError) as ex: TERMINATE_REQUESTED = True @@ -357,8 +367,8 @@ def check_processed_list(self): global PROCESSED_URLS if len(PROCESSED_URLS) > 0: first_key = sorted(PROCESSED_URLS.keys())[0] + self.logger.warning('FIRST KEY={} UID_PROCESSED={}'.format(first_key, UID_PROCESSED)) if first_key == UID_PROCESSED: - self.logger.warning('FIRST KEY={}'.format(first_key)) out_queue_put(PROCESSED_URLS[first_key]) del PROCESSED_URLS[first_key] UID_PROCESSED += 1