From 4a192418db51055ceb81fefc76c8e4635a0d415a Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Thu, 28 Mar 2024 11:48:24 +0000 Subject: [PATCH] data store: remove threading * Closes #194 * Run subscribers via asyncio rather than the ThreadPoolExecutor. * The only non-blocking operations in the code being called were: - time.sleep (has an async variant) - asyncio.sleep (async) - self.socket.recv_multipart (async) * Refactored the code so that the underlying async functions could be called via asyncio. --- cylc/uiserver/app.py | 13 ++-- cylc/uiserver/data_store_mgr.py | 131 +++++++++++++++++++++++--------- 2 files changed, 105 insertions(+), 39 deletions(-) diff --git a/cylc/uiserver/app.py b/cylc/uiserver/app.py index 8e9fb00c..8b809568 100644 --- a/cylc/uiserver/app.py +++ b/cylc/uiserver/app.py @@ -450,10 +450,16 @@ def initialize_settings(self): ) self.profiler.start() + # start up the data store manager update task + ioloop.IOLoop.current().add_callback( + self.data_store_mgr.startup + ) + # start the async scan task running (do this on server start not init) ioloop.IOLoop.current().add_callback( self.workflows_mgr.run ) + # configure the scan interval ioloop.PeriodicCallback( self.workflows_mgr.scan, @@ -571,12 +577,9 @@ def launch_instance(cls, argv=None, workflow_id=None, **kwargs): del os.environ["JUPYTER_RUNTIME_DIR"] async def stop_extension(self): + self.profiler.stop() # stop the async scan task await self.workflows_mgr.stop() - for sub in self.data_store_mgr.w_subs.values(): - sub.stop() - # Shutdown the thread pool executor - self.data_store_mgr.executor.shutdown(wait=False) + self.data_store_mgr.shutdown() # Destroy ZeroMQ context of all sockets self.workflows_mgr.context.destroy() - self.profiler.stop() diff --git a/cylc/uiserver/data_store_mgr.py b/cylc/uiserver/data_store_mgr.py index b7e526a0..a4cab089 100644 --- a/cylc/uiserver/data_store_mgr.py +++ b/cylc/uiserver/data_store_mgr.py @@ -32,11 +32,11 @@ """ import asyncio -from concurrent.futures import ThreadPoolExecutor from copy import deepcopy +from functools import wraps from pathlib import Path import time -from typing import Dict, Optional, Set +from typing import Dict, Optional, Set, NamedTuple from cylc.flow.exceptions import WorkflowStopped from cylc.flow.id import Tokens @@ -72,6 +72,34 @@ def _inner(*args, **kwargs): # works for serial & async calls return _inner +def call_to_tuple(fcn): + """Turns function calls into an (args, kwargs) tuple. + + Examples: + >>> xargs(list)(1, 2, a=3, b=4) + [(1, 2), {'a': 3, 'b': 4}] + + """ + @wraps(fcn) + def _inner(*args, **kwargs): + nonlocal fcn + return fcn((args, kwargs)) + + return _inner + + +class Subscriber(NamedTuple): + """Represents an active subscription. + + Args: + subscriber: The subscriber client object. + task: The asyncio task running the client. + + """ + subscriber: WorkflowSubscriber + task: asyncio.Task + + class DataStoreMgr: """Manage the local data-store acquisition/updates for all workflows. @@ -100,11 +128,30 @@ def __init__(self, workflows_mgr, log, max_threads=10): self.workflows_mgr = workflows_mgr self.log = log self.data = {} - self.w_subs: Dict[str, WorkflowSubscriber] = {} + self.subscribers: Dict[str, Subscriber] = {} self.topics = {ALL_DELTAS.encode('utf-8'), b'shutdown'} - self.loop = None - self.executor = ThreadPoolExecutor(max_threads) self.delta_queues = {} + self.message_queue = asyncio.Queue() + + def startup(self): + """Start the data store manager. + + Note: Call this after the asyncio event loop has been opened. + """ + self.message_processor_task = asyncio.create_task( + self._process_message_queues() + ) + + def shutdown(self): + """Stop the data store manager. + + This will stop any active subscribers. + + Note: It will not wait for pending messages to be processed. + """ + for w_id in self.subscribers: + self._stop_subscription(w_id) + self.message_processor_task.cancel() @log_call async def register_workflow(self, w_id: str, is_active: bool) -> None: @@ -152,23 +199,17 @@ async def connect_workflow(self, w_id, contact_data): blocking the main loop. """ - if self.loop is None: - self.loop = asyncio.get_running_loop() - # don't sync if subscription exists - if w_id in self.w_subs: + if w_id in self.subscribers: return self.delta_queues[w_id] = {} - # Might be options other than threads to achieve - # non-blocking subscriptions, but this works. - self.executor.submit( - self._start_subscription, + self._start_subscription( w_id, contact_data['name'], contact_data[CFF.HOST], - contact_data[CFF.PUBLISH_PORT] + contact_data[CFF.PUBLISH_PORT], ) successful_updates = await self._entire_workflow_update(ids=[w_id]) @@ -204,9 +245,7 @@ def disconnect_workflow(self, w_id, update_contact=True): status=WorkflowStatus.STOPPED.value, status_msg=disconnect_msg, ) - if w_id in self.w_subs: - self.w_subs[w_id].stop() - del self.w_subs[w_id] + self._stop_subscription(w_id) def get_workflows(self): """Return all workflows the data store is currently tracking. @@ -237,36 +276,61 @@ def _purge_workflow(self, w_id): if w_id in self.delta_queues: del self.delta_queues[w_id] - def _start_subscription(self, w_id, reg, host, port): + def _start_subscription(self, w_id: str, reg: str, host: str, port: int): """Instantiate and run subscriber data-store sync. Args: - w_id (str): Workflow external ID. - reg (str): Registered workflow name. - host (str): Hostname of target workflow. - port (int): Port of target workflow. + w_id: Workflow external ID. + reg: Registered workflow name. + host: Hostname of target workflow. + port: Port of target workflow. """ - self.w_subs[w_id] = WorkflowSubscriber( + subscriber = WorkflowSubscriber( reg, host=host, port=port, context=self.workflows_mgr.context, - topics=self.topics + topics=self.topics, ) - self.w_subs[w_id].loop.run_until_complete( - self.w_subs[w_id].subscribe( + subscriber_task = asyncio.create_task( + subscriber.subscribe( process_delta_msg, - func=self._update_workflow_data, - w_id=w_id)) + func=call_to_tuple(self.message_queue.put_nowait), + w_id=w_id, + ) + ) + self.subscribers[w_id] = Subscriber(subscriber, subscriber_task) + + def _stop_subscription(self, w_id: str) -> None: + """Stop an active subscription. - def _update_workflow_data(self, topic, delta, w_id): + Args: + w_id: Workflow external ID. + + """ + if w_id in self.subscribers: + self.subscribers[w_id].subscriber.stop() + del self.subscribers[w_id] + + async def _process_message_queues(self): + """Wait for new messages, call the update method then they arrive.""" + while True: + args, kwargs = await self.message_queue.get() + await self._update_workflow_data(*args, **kwargs) + + async def _update_workflow_data( + self, + topic: str, + delta, + w_id: str, + ) -> None: """Manage and apply incoming data-store deltas. Args: - topic (str): topic of published data. - delta (object): Published protobuf message data container. - w_id (str): Workflow external ID. + topic: topic of published data. + delta: Published protobuf message data container. + w_id: Workflow external ID. """ # wait until data-store is populated for this workflow @@ -275,7 +339,7 @@ def _update_workflow_data(self, topic, delta, w_id): while loop_cnt < self.INIT_DATA_WAIT_TIME: if w_id in self.data: break - time.sleep(self.INIT_DATA_RETRY_DELAY) + await asyncio.sleep(self.INIT_DATA_RETRY_DELAY) loop_cnt += 1 continue if topic == 'shutdown': @@ -351,7 +415,6 @@ def _reconcile_update(self, topic, delta, w_id): 'pb_data_elements', args={'element_type': topic} ), - self.loop ) new_delta_msg = future.result(self.RECONCILE_TIMEOUT) new_delta = DELTAS_MAP[topic]()