Skip to content

Commit

Permalink
data store: remove threading
Browse files Browse the repository at this point in the history
* Closes cylc#194
* Run subscribers via asyncio rather than the ThreadPoolExecutor.
* The only non-blocking operations in the code being called were:
  - time.sleep  (has an async variant)
  - asyncio.sleep (async)
  - self.socket.recv_multipart (async)
* Refactored the code so that the underlying async functions could be
  called via asyncio.
  • Loading branch information
oliver-sanders committed Apr 22, 2024
1 parent b007b5f commit 1970ef9
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 65 deletions.
24 changes: 8 additions & 16 deletions cylc/uiserver/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,16 +327,6 @@ class CylcUIServer(ExtensionApp):
''',
default_value=1
)
max_threads = Int(
config=True,
help='''
Set the maximum number of threads the Cylc UI Server can use.
This determines the maximum number of active workflows that the
server can track.
''',
default_value=100,
)
profile = Bool(
config=True,
help='''
Expand Down Expand Up @@ -410,7 +400,6 @@ def __init__(self, *args, **kwargs):
self.data_store_mgr = DataStoreMgr(
self.workflows_mgr,
self.log,
self.max_threads,
)
# sub_status dictionary storing status of subscriptions
self.sub_statuses = {}
Expand Down Expand Up @@ -450,10 +439,16 @@ def initialize_settings(self):
)
self.profiler.start()

# start up the data store manager update task
ioloop.IOLoop.current().add_callback(
self.data_store_mgr.startup
)

# start the async scan task running (do this on server start not init)
ioloop.IOLoop.current().add_callback(
self.workflows_mgr.run
)

# configure the scan interval
ioloop.PeriodicCallback(
self.workflows_mgr.scan,
Expand Down Expand Up @@ -571,12 +566,9 @@ def launch_instance(cls, argv=None, workflow_id=None, **kwargs):
del os.environ["JUPYTER_RUNTIME_DIR"]

async def stop_extension(self):
self.profiler.stop()
# stop the async scan task
await self.workflows_mgr.stop()
for sub in self.data_store_mgr.w_subs.values():
sub.stop()
# Shutdown the thread pool executor
self.data_store_mgr.executor.shutdown(wait=False)
self.data_store_mgr.shutdown()
# Destroy ZeroMQ context of all sockets
self.workflows_mgr.context.destroy()
self.profiler.stop()
153 changes: 104 additions & 49 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
"""

import asyncio
from concurrent.futures import ThreadPoolExecutor
from copy import deepcopy
from functools import wraps
from pathlib import Path
import time
from typing import Dict, Optional, Set
from typing import Dict, Optional, Set, NamedTuple

from cylc.flow.exceptions import WorkflowStopped
from cylc.flow.id import Tokens
Expand All @@ -57,7 +57,7 @@
from .workflows_mgr import workflow_request


def log_call(fcn):
def _log_call(fcn):
"""Decorator for data store methods we want to log."""
fcn_name = f'[data-store] {fcn.__name__}'

Expand All @@ -72,6 +72,34 @@ def _inner(*args, **kwargs): # works for serial & async calls
return _inner


def _call_to_tuple(fcn):
"""Turns function calls into an (args, kwargs) tuple.
Examples:
>>> _call_to_tuple(list)(1, 2, a=3, b=4)
[(1, 2), {'a': 3, 'b': 4}]
"""
@wraps(fcn)
def _inner(*args, **kwargs):
nonlocal fcn
return fcn((args, kwargs))

return _inner


class ActiveSubscription(NamedTuple):
"""Represents an active subscription.
Args:
subscriber: The subscriber client object.
task: The asyncio task running the client.
"""
subscriber: WorkflowSubscriber
task: asyncio.Task


class DataStoreMgr:
"""Manage the local data-store acquisition/updates for all workflows.
Expand All @@ -80,14 +108,6 @@ class DataStoreMgr:
Service that scans for workflows.
log:
Application logger.
max_threads:
Max number of threads to use for subscriptions.
Note, this determines the maximum number of active workflows that
can be updated.
This should be overridden for real use in the UIS app. The
default is here for test purposes.
"""

Expand All @@ -96,17 +116,36 @@ class DataStoreMgr:
RECONCILE_TIMEOUT = 5. # seconds
PENDING_DELTA_CHECK_INTERVAL = 0.5

def __init__(self, workflows_mgr, log, max_threads=10):
def __init__(self, workflows_mgr, log):
self.workflows_mgr = workflows_mgr
self.log = log
self.data = {}
self.w_subs: Dict[str, WorkflowSubscriber] = {}
self.active_subscriptions: Dict[str, ActiveSubscription] = {}
self.topics = {ALL_DELTAS.encode('utf-8'), b'shutdown'}
self.loop = None
self.executor = ThreadPoolExecutor(max_threads)
self.delta_queues = {}
self.message_queue = asyncio.Queue()

def startup(self):
"""Start the data store manager.
Note: Call this after the asyncio event loop has been opened.
"""
self.message_processor_task = asyncio.create_task(
self._process_message_queues()
)

def shutdown(self):
"""Stop the data store manager.
This will stop any active subscribers.
Note: It will not wait for pending messages to be processed.
"""
for w_id in self.active_subscriptions:
self._stop_subscription(w_id)
self.message_processor_task.cancel()

@log_call
@_log_call
async def register_workflow(self, w_id: str, is_active: bool) -> None:
"""Register a new workflow with the data store.
Expand All @@ -126,7 +165,7 @@ async def register_workflow(self, w_id: str, is_active: bool) -> None:
status_msg=self._get_status_msg(w_id, is_active),
)

@log_call
@_log_call
async def unregister_workflow(self, w_id):
"""Remove a workflow from the data store entirely.
Expand All @@ -141,7 +180,7 @@ async def unregister_workflow(self, w_id):
await asyncio.sleep(self.PENDING_DELTA_CHECK_INTERVAL)
self._purge_workflow(w_id)

@log_call
@_log_call
async def connect_workflow(self, w_id, contact_data):
"""Initiate workflow subscriptions.
Expand All @@ -152,23 +191,17 @@ async def connect_workflow(self, w_id, contact_data):
blocking the main loop.
"""
if self.loop is None:
self.loop = asyncio.get_running_loop()

# don't sync if subscription exists
if w_id in self.w_subs:
if w_id in self.active_subscriptions:
return

self.delta_queues[w_id] = {}

# Might be options other than threads to achieve
# non-blocking subscriptions, but this works.
self.executor.submit(
self._start_subscription,
self._start_subscription(
w_id,
contact_data['name'],
contact_data[CFF.HOST],
contact_data[CFF.PUBLISH_PORT]
contact_data[CFF.PUBLISH_PORT],
)
successful_updates = await self._entire_workflow_update(ids=[w_id])

Expand All @@ -182,7 +215,7 @@ async def connect_workflow(self, w_id, contact_data):
# don't update the contact data until we have successfully updated
self._update_contact(w_id, contact_data)

@log_call
@_log_call
def disconnect_workflow(self, w_id, update_contact=True):
"""Terminate workflow subscriptions.
Expand All @@ -204,9 +237,7 @@ def disconnect_workflow(self, w_id, update_contact=True):
status=WorkflowStatus.STOPPED.value,
status_msg=disconnect_msg,
)
if w_id in self.w_subs:
self.w_subs[w_id].stop()
del self.w_subs[w_id]
self._stop_subscription(w_id)

def get_workflows(self):
"""Return all workflows the data store is currently tracking.
Expand All @@ -226,7 +257,7 @@ def get_workflows(self):
active.add(w_id)
return active, inactive

@log_call
@_log_call
def _purge_workflow(self, w_id):
"""Purge the manager of a workflow's subscription and data."""
# Ensure no old/new subscriptions exist on purge,
Expand All @@ -237,36 +268,61 @@ def _purge_workflow(self, w_id):
if w_id in self.delta_queues:
del self.delta_queues[w_id]

def _start_subscription(self, w_id, reg, host, port):
def _start_subscription(self, w_id: str, reg: str, host: str, port: int):
"""Instantiate and run subscriber data-store sync.
Args:
w_id (str): Workflow external ID.
reg (str): Registered workflow name.
host (str): Hostname of target workflow.
port (int): Port of target workflow.
w_id: Workflow external ID.
reg: Registered workflow name.
host: Hostname of target workflow.
port: Port of target workflow.
"""
self.w_subs[w_id] = WorkflowSubscriber(
subscriber = WorkflowSubscriber(
reg,
host=host,
port=port,
context=self.workflows_mgr.context,
topics=self.topics
topics=self.topics,
)
self.w_subs[w_id].loop.run_until_complete(
self.w_subs[w_id].subscribe(
subscriber_task = asyncio.create_task(
subscriber.subscribe(
process_delta_msg,
func=self._update_workflow_data,
w_id=w_id))
func=_call_to_tuple(self.message_queue.put_nowait),
w_id=w_id,
)
)
self.active_subscriptions[w_id] = ActiveSubscription(subscriber, subscriber_task)

def _stop_subscription(self, w_id: str) -> None:
"""Stop an active subscription.
def _update_workflow_data(self, topic, delta, w_id):
Args:
w_id: Workflow external ID.
"""
if w_id in self.active_subscriptions:
self.active_subscriptions[w_id].subscriber.stop()
del self.active_subscriptions[w_id]

async def _process_message_queues(self):
"""Wait for new messages, call the update method then they arrive."""
while True:
args, kwargs = await self.message_queue.get()
await self._update_workflow_data(*args, **kwargs)

async def _update_workflow_data(
self,
topic: str,
delta,
w_id: str,
) -> None:
"""Manage and apply incoming data-store deltas.
Args:
topic (str): topic of published data.
delta (object): Published protobuf message data container.
w_id (str): Workflow external ID.
topic: topic of published data.
delta: Published protobuf message data container.
w_id: Workflow external ID.
"""
# wait until data-store is populated for this workflow
Expand All @@ -275,7 +331,7 @@ def _update_workflow_data(self, topic, delta, w_id):
while loop_cnt < self.INIT_DATA_WAIT_TIME:
if w_id in self.data:
break
time.sleep(self.INIT_DATA_RETRY_DELAY)
await asyncio.sleep(self.INIT_DATA_RETRY_DELAY)
loop_cnt += 1
continue
if topic == 'shutdown':
Expand Down Expand Up @@ -351,7 +407,6 @@ def _reconcile_update(self, topic, delta, w_id):
'pb_data_elements',
args={'element_type': topic}
),
self.loop
)
new_delta_msg = future.result(self.RECONCILE_TIMEOUT)
new_delta = DELTAS_MAP[topic]()
Expand Down

0 comments on commit 1970ef9

Please sign in to comment.