-
Notifications
You must be signed in to change notification settings - Fork 114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve throughput for immediate tasks (1/3) #5841
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
Added a formal "immediate" type of Task and changed workers behavior to prioritize those. | ||
This labeling is exlusive to plugin code and should only be applied where it's known that | ||
the task will finish shortly, like in updates of repositories, remotes, and distributions. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
# Generated by Django 4.2.16 on 2024-10-02 17:42 | ||
|
||
from django.db import migrations, models | ||
|
||
|
||
class Migration(migrations.Migration): | ||
|
||
dependencies = [ | ||
("core", "0123_upstreampulp_q_select"), | ||
] | ||
|
||
operations = [ | ||
migrations.AddField( | ||
model_name="task", | ||
name="deferred", | ||
field=models.BooleanField(default=True, null=True), | ||
), | ||
migrations.AddField( | ||
model_name="task", | ||
name="immediate", | ||
field=models.BooleanField(default=False, null=True), | ||
), | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -175,6 +175,8 @@ def beat(self): | |
self.worker_cleanup() | ||
with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(TASK_SCHEDULING_LOCK): | ||
dispatch_scheduled_tasks() | ||
# This "reporting code" must not me moved inside a task, because it is supposed | ||
# to be able to report on a congested tasking system to produce reliable results. | ||
self.record_unblocked_waiting_tasks_metric() | ||
|
||
def notify_workers(self): | ||
|
@@ -234,8 +236,11 @@ def is_compatible(self, task): | |
return False | ||
return True | ||
|
||
def identify_unblocked_tasks(self): | ||
"""Iterate over waiting tasks and mark them unblocked accordingly.""" | ||
def unblock_tasks(self): | ||
"""Iterate over waiting tasks and mark them unblocked accordingly. | ||
Returns `True` if at least one task was unblocked. `False` otherwise. | ||
""" | ||
|
||
changed = False | ||
taken_exclusive_resources = set() | ||
|
@@ -296,33 +301,33 @@ def identify_unblocked_tasks(self): | |
|
||
def iter_tasks(self): | ||
"""Iterate over ready tasks and yield each task while holding the lock.""" | ||
|
||
while not self.shutdown_requested: | ||
# When batching this query, be sure to use "pulp_created" as a cursor | ||
for task in Task.objects.filter( | ||
state__in=TASK_INCOMPLETE_STATES, unblocked_at__isnull=False | ||
).order_by("pulp_created"): | ||
state__in=TASK_INCOMPLETE_STATES, | ||
unblocked_at__isnull=False, | ||
).order_by("-immediate", "pulp_created"): | ||
# This code will only be called if we acquired the lock successfully | ||
# The lock will be automatically be released at the end of the block | ||
with contextlib.suppress(AdvisoryLockError), task: | ||
# This code will only be called if we acquired the lock successfully | ||
# The lock will be automatically be released at the end of the block | ||
# Check if someone else changed the task before we got the lock | ||
task.refresh_from_db() | ||
|
||
if task.state == TASK_STATES.CANCELING and task.worker is None: | ||
# No worker picked this task up before being canceled | ||
if self.cancel_abandoned_task(task, TASK_STATES.CANCELED): | ||
# Continue looking for the next task | ||
# without considering this tasks resources | ||
# as we just released them | ||
# Continue looking for the next task without considering this | ||
# tasks resources, as we just released them | ||
continue | ||
if task.state in [TASK_STATES.RUNNING, TASK_STATES.CANCELING]: | ||
# A running task without a lock must be abandoned | ||
if self.cancel_abandoned_task( | ||
task, TASK_STATES.FAILED, "Worker has gone missing." | ||
): | ||
# Continue looking for the next task | ||
# without considering this tasks resources | ||
# as we just released them | ||
# Continue looking for the next task without considering this | ||
# tasks resources, as we just released them | ||
continue | ||
|
||
# This statement is using lazy evaluation | ||
if ( | ||
task.state == TASK_STATES.WAITING | ||
|
@@ -333,6 +338,7 @@ def iter_tasks(self): | |
# Start from the top of the Task list | ||
break | ||
else: | ||
# No task found in the for-loop | ||
break | ||
|
||
def sleep(self): | ||
|
@@ -399,7 +405,7 @@ def supervise_task(self, task): | |
with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock( | ||
TASK_UNBLOCKING_LOCK | ||
): | ||
self.identify_unblocked_tasks() | ||
self.unblock_tasks() | ||
self.wakeup = False | ||
if task_process.sentinel in r: | ||
if not task_process.is_alive(): | ||
|
@@ -422,6 +428,7 @@ def supervise_task(self, task): | |
) | ||
cancel_state = TASK_STATES.FAILED | ||
cancel_reason = "Aborted during worker shutdown." | ||
|
||
task_process.join() | ||
if not cancel_state and task_process.exitcode != 0: | ||
_logger.warning( | ||
|
@@ -445,11 +452,16 @@ def supervise_task(self, task): | |
self.task = None | ||
|
||
def handle_available_tasks(self): | ||
"""Handle available tasks in a monitor/supervise cycle. | ||
The cycle must spin until there are no more available tasks. Any flaw in detecting this | ||
can lead to stale tasks in the database. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually there is not such thing as a stale task. The thing that can happen here is a stuck tasking system. (TBF, it only needs one additional worker startup or one more dispatch to unstick it, but we should really avoid it.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about:
I also could leave just the first line if you think that's too much detail for a docstring. |
||
""" | ||
keep_looping = True | ||
while keep_looping and not self.shutdown_requested: | ||
try: | ||
with PGAdvisoryLock(TASK_UNBLOCKING_LOCK): | ||
keep_looping = self.identify_unblocked_tasks() | ||
keep_looping = self.unblock_tasks() | ||
except AdvisoryLockError: | ||
keep_looping = True | ||
for task in self.iter_tasks(): | ||
|
@@ -509,11 +521,13 @@ def run(self, burst=False): | |
else: | ||
self.cursor.execute("LISTEN pulp_worker_wakeup") | ||
while not self.shutdown_requested: | ||
# do work | ||
if self.shutdown_requested: | ||
break | ||
self.handle_available_tasks() | ||
if self.shutdown_requested: | ||
break | ||
# rest until notified to wakeup | ||
self.sleep() | ||
self.cursor.execute("UNLISTEN pulp_worker_wakeup") | ||
self.cursor.execute("UNLISTEN pulp_worker_metrics_heartbeat") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and deferred…