Skip to content

Commit

Permalink
Changed HTTP retires to not hard coded
Browse files Browse the repository at this point in the history
  • Loading branch information
rocky4546 committed Dec 4, 2023
1 parent a128521 commit b3a36f5
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 24 deletions.
9 changes: 8 additions & 1 deletion lib/common/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,19 @@ def handle_url_except(f=None, timeout=None):

def wrapper_func(self, *args, **kwargs):
ex_save = None
# arg0 = uri, arg1=retries
if len(args) == 0:
self.logger.warning('get uri called with no args f:{}'.format(f))
arg0 = 'None'
retries = 2
elif len(args) == 1:
arg0 = args[0]
retries = 2
else:
arg0 = args[0]
i = 2
self.logger.warning('get uri called from {} with retires={}'.format(f, args[1]))
retries = args[1]
i = retries
is_done = 0
while i > is_done:
i -= 1
Expand Down
2 changes: 1 addition & 1 deletion lib/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

import lib.common.exceptions as exceptions

VERSION = '0.9.14.00-RC02'
VERSION = '0.9.14.00-RC03'
CABERNET_URL = 'https://github.com/cabernetwork/cabernet'
CABERNET_ID = 'cabernet'
CABERNET_REPO = 'manifest.json'
Expand Down
4 changes: 2 additions & 2 deletions lib/plugins/plugin_channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def clean_group_name(self, group_name):
return re.sub('[ +&*%$#@!:;,<>?]', '', group_name)

@handle_url_except()
def get_thumbnail_size(self, _thumbnail, _ch_uid, ):
def get_thumbnail_size(self, _thumbnail, _retries, _ch_uid, ):
thumbnail_size = (0, 0)
if _thumbnail is None or _thumbnail == '':
return thumbnail_size
Expand Down Expand Up @@ -194,7 +194,7 @@ def get_thumbnail_size(self, _thumbnail, _ch_uid, ):
return thumbnail_size

@handle_url_except
def get_best_stream(self, _url, _channel_id, _referer=None):
def get_best_stream(self, _url, _retries, _channel_id, _referer=None):
if self.config_obj.data[self.config_section]['player-stream_type'] == 'm3u8redirect':
return _url

Expand Down
3 changes: 1 addition & 2 deletions lib/streams/internal_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,14 @@
from .stream import Stream

MAX_OUT_QUEUE_SIZE = 30
IDLE_COUNTER_MAX = 59 # time in seconds beyond any filtered or serving packet to terminate the stream
IDLE_COUNTER_MAX = 140 # four times the timeout * retries to terminate the stream in seconds
STARTUP_IDLE_COUNTER = 40 # time to wait for an initial stream
# code assumes a timeout response in TVH of 15 or higher.

class InternalProxy(Stream):

is_m3u8_starting = 0


def __init__(self, _plugins, _hdhr_queue):
global MAX_OUT_QUEUE_SIZE
self.last_refresh = None
Expand Down
46 changes: 28 additions & 18 deletions lib/streams/m3u8_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,22 @@ def run(self):
m3u8_data = self.process_m3u8_item(self.queue_item)
PROCESSED_URLS[self.uid_counter] = m3u8_data
STREAM_QUEUE.put({'uri_dt': 'check_processed_list'})
self.logger.debug('M3U8GetUriData terminated COUNTER {} {} {}'.format(self.uid_counter, os.getpid(), threading.get_ident()))
m3u8_data = None
self.queue_item = None
self.uid_counter = None
self.video = None
self.pts_validation = None
self.logger.debug('M3U8GetUriData terminated {} {}'.format(os.getpid(), threading.get_ident()))
self.logger = None



@handle_url_except()
def get_uri_data(self, _uri):
self.logger.warning(M3U8Queue.http_header)
resp = M3U8Queue.http_session.get(_uri, headers=M3U8Queue.http_header, timeout=5, follow_redirects=True)
def get_uri_data(self, _uri, _retries):
"""
_retries is used by the decorator when a HTTP failure occurs
"""
resp = M3U8Queue.http_session.get(_uri, headers=M3U8Queue.http_header, timeout=8, follow_redirects=True)
x = resp.content
resp.raise_for_status()
return x
Expand All @@ -101,7 +103,7 @@ def decrypt_stream(self, _data):
self.logger.warning('Unknown protocol, aborting {} {}'.format(os.getpid(), _data['key']['uri']))
return False
else:
key_data = self.get_uri_data(_data['key']['uri'])
key_data = self.get_uri_data(_data['key']['uri'], 3)

if key_data is not None:
M3U8Queue.key_list[_data['key']['uri']] = key_data
Expand Down Expand Up @@ -171,6 +173,7 @@ def process_m3u8_item(self, _queue_item):
global TERMINATE_REQUESTED
global PLAY_LIST
global OUT_QUEUE
global UID_PROCESSED
uri_dt = _queue_item['uri_dt']
data = _queue_item['data']
if data['filtered']:
Expand All @@ -185,15 +188,16 @@ def process_m3u8_item(self, _queue_item):
else:
count = 1
while count > 0:
self.video.data = self.get_uri_data(data['uri'])
self.video.data = self.get_uri_data(data['uri'], 3)
if self.video.data:
break

# TBD WHAT TO DO WITH THIS?
out_queue_put({'uri': 'extend',
'data': data,
'stream': None,
'atsc': None})
if count > 1:
out_queue_put({'uri': 'extend',
'data': data,
'stream': None,
'atsc': None})
count -= 1

if uri_dt not in PLAY_LIST.keys():
Expand Down Expand Up @@ -234,6 +238,11 @@ def process_m3u8_item(self, _queue_item):
'atsc': None}
atsc_default_msg = self.atsc_processing()
PLAY_LIST[uri_dt]['played'] = True
if self.uid_counter > UID_PROCESSED+1:
out_queue_put({'uri': 'extend',
'data': data,
'stream': None,
'atsc': None})
return {'uri': data['uri'],
'data': data,
'stream': self.video.data,
Expand Down Expand Up @@ -290,10 +299,10 @@ def run(self):
try:
while not TERMINATE_REQUESTED:
queue_item = STREAM_QUEUE.get()
self.logger.warning('QUEUE SIZE: {}'.format(STREAM_QUEUE.qsize()))
if queue_item['uri_dt'] == 'terminate':
self.logger.debug('Received terminate from internalproxy {}'.format(os.getpid()))
TERMINATE_REQUESTED = True

break
elif queue_item['uri_dt'] == 'status':
out_queue_put({'uri': 'running',
Expand All @@ -302,21 +311,22 @@ def run(self):
'atsc': None})
continue
elif queue_item['uri_dt'] == 'check_processed_list':
self.logger.warning('#### Received check_processed_list {} COUNTER: {} PROCESSED: {}'.format(os.getpid(), UID_COUNTER, UID_PROCESSED))
self.logger.warning('#### Received check_processed_list {} COUNTER: {} PROCESSED: {} PROCESSED_Q: {} QUEUE: {}'.format(os.getpid(), UID_COUNTER, UID_PROCESSED, len(PROCESSED_URLS), STREAM_QUEUE.qsize()))
self.check_processed_list()
time.sleep(.1)
continue

self.logger.warning('**** Received check_processed_list {} COUNTER: {} PROCESSED: {} QUEUE: {}'.format(os.getpid(), UID_COUNTER, UID_PROCESSED, len(PROCESSED_URLS)))
self.logger.warning('**** Received check_processed_list {} COUNTER: {} PROCESSED: {} PROCESSED_Q: {}'.format(os.getpid(), UID_COUNTER, UID_PROCESSED, len(PROCESSED_URLS)))
self.check_processed_list()
while UID_COUNTER - UID_PROCESSED > 4:
self.logger.warning('SLOWING PROCESSING DUE TO BACKUP')
time.sleep(.4)
while UID_COUNTER - UID_PROCESSED - len(PROCESSED_URLS) > 4:
self.logger.warning('SLOWING PROCESSING: {} PROCESSED: {} PROCESSED_Q: {} QUEUE: {}'.format(UID_COUNTER, UID_PROCESSED, len(PROCESSED_URLS), STREAM_QUEUE.qsize()))
time.sleep(.5)
self.check_processed_list()
time.sleep(.1)
self.process_queue = M3U8GetUriData(queue_item, UID_COUNTER, self.config)
time.sleep(.1)

self.logger.warning('thread running for {} COUNTER: {} PROCESSED: {}'.format(os.getpid(), UID_COUNTER, UID_PROCESSED))
self.logger.warning('thread running for {} COUNTER: {} PROCESSED: {} PROCESSED_Q: {}'.format(os.getpid(), UID_COUNTER, UID_PROCESSED, len(PROCESSED_URLS)))
UID_COUNTER += 1
except (KeyboardInterrupt, EOFError) as ex:
TERMINATE_REQUESTED = True
Expand Down Expand Up @@ -357,8 +367,8 @@ def check_processed_list(self):
global PROCESSED_URLS
if len(PROCESSED_URLS) > 0:
first_key = sorted(PROCESSED_URLS.keys())[0]
self.logger.warning('FIRST KEY={} UID_PROCESSED={}'.format(first_key, UID_PROCESSED))
if first_key == UID_PROCESSED:
self.logger.warning('FIRST KEY={}'.format(first_key))
out_queue_put(PROCESSED_URLS[first_key])
del PROCESSED_URLS[first_key]
UID_PROCESSED += 1
Expand Down

0 comments on commit b3a36f5

Please sign in to comment.