Skip to content

Commit

Permalink
Fix: [ALAS] Limit pool size when using run_in_executor()
Browse files Browse the repository at this point in the history
  • Loading branch information
LmeSzinc committed Sep 9, 2024
1 parent b1f64fb commit febdb79
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 2 deletions.
9 changes: 8 additions & 1 deletion module/device/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,18 +715,25 @@ def adb_brute_force_connect(self, serial_list):
serial_list (list[str]):
"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
ev = asyncio.new_event_loop()
pool = ThreadPoolExecutor(
max_workers=len(serial_list),
thread_name_prefix='adb_brute_force_connect',
)

def _connect(serial):
msg = self.adb_client.connect(serial)
logger.info(msg)
return msg

async def connect():
tasks = [ev.run_in_executor(None, _connect, serial) for serial in serial_list]
tasks = [ev.run_in_executor(pool, _connect, serial) for serial in serial_list]
await asyncio.gather(*tasks)

ev.run_until_complete(connect())
pool.shutdown(wait=False)
ev.close()

@Config.when(DEVICE_OVER_HTTP=True)
def adb_connect(self):
Expand Down
32 changes: 31 additions & 1 deletion module/device/method/nemu_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,25 @@ def __enter__(self):

def __exit__(self, exc_type, exc_val, exc_tb):
self.disconnect()
if has_cached_property(self, '_ev'):
self._ev.close()
del_cached_property(self, '_ev')
if has_cached_property(self, '_pool'):
self._pool.shutdown(wait=False)
del_cached_property(self, '_pool')

@cached_property
def _ev(self):
return asyncio.new_event_loop()

@cached_property
def _pool(self):
from concurrent.futures import ThreadPoolExecutor
return ThreadPoolExecutor(
max_workers=1,
thread_name_prefix='NemuIpc',
)

async def ev_run_async(self, func, *args, timeout=0.15, **kwargs):
"""
Args:
Expand All @@ -294,7 +308,7 @@ async def ev_run_async(self, func, *args, timeout=0.15, **kwargs):
func_wrapped = partial(func, *args, **kwargs)
# Increased timeout for slow PCs
# Default screenshot interval is 0.2s, so a 0.15s timeout would have a fast retry without extra time costs
result = await asyncio.wait_for(self._ev.run_in_executor(None, func_wrapped), timeout=timeout)
result = await asyncio.wait_for(self._ev.run_in_executor(self._pool, func_wrapped), timeout=timeout)
return result

def ev_run_sync(self, func, *args, **kwargs):
Expand Down Expand Up @@ -604,3 +618,19 @@ def drag_nemu_ipc(self, p1, p2, point_random=(-10, -10, 10, 10)):

self.nemu_ipc.up()
self.sleep(0.050)

def show_thread():
logger.hr('Show thread')
import threading
for thread in threading.enumerate():
if thread._target is None:
name = None
else:
name = thread._target.__name__
logger.info([thread.name, thread.is_alive(), name])

if __name__ == '__main__':
self = NemuIpc('alas')
show_thread()
self.screenshot_nemu_ipc()
show_thread()

0 comments on commit febdb79

Please sign in to comment.