Skip to content

Commit

Permalink
Add Task.immediate field and prioritize them on workers
Browse files Browse the repository at this point in the history
* immediate prioritization

This defines tasks of the type "immediate" by adding a Task.immediate(bool)
field to the Task model. These type of task are ordered first in the
worker's query to pick up a new available task.

* misc

This also adds a "deferred" field to the Task model, so we can later
harden the constraint that 'deferred == False' and immediate == True'
shouldn't get past the API level. This is possible today in some
edge case failure scenarios on dispatch.

Closes #5767
  • Loading branch information
pedro-psb committed Oct 2, 2024
1 parent 79b87ef commit 697f97a
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 16 deletions.
3 changes: 3 additions & 0 deletions CHANGES/5767.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added a formal "immediate" type of Task and changed workers behavior to prioritize those.
This labeling is exlusive to plugin code and should only be applied where it's known that
the task will finish shortly, like in updates of repositories, remotes, and distributions.
18 changes: 18 additions & 0 deletions pulpcore/app/migrations/0124_task_immediate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.16 on 2024-09-27 19:05

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("core", "0123_upstreampulp_q_select"),
]

operations = [
migrations.AddField(
model_name="task",
name="immediate",
field=models.BooleanField(default=False),
),
]
11 changes: 11 additions & 0 deletions pulpcore/app/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ class Task(BaseModel, AutoAddObjPermsMixin):
state (models.TextField): The state of the task
name (models.TextField): The name of the task
logging_cid (models.TextField): The logging CID associated with the task
unblocked_at (models.DateTimeField): The time the task was marked as unblocked.
This is supervised/updated by all awake workers and is part of the definition
of a ready-to-be-taken task.
started_at (models.DateTimeField): The time the task started executing
finished_at (models.DateTimeField): The time the task finished executing
error (models.JSONField): Fatal errors generated by the task
Expand All @@ -89,6 +92,11 @@ class Task(BaseModel, AutoAddObjPermsMixin):
the task
reserved_resources_record (django.contrib.postgres.fields.ArrayField): The reserved
resources required for the task.
immediate (models.BooleanField): Whether this is guaranteed to execute fast
without blocking. Defaults to `False`.
deferred (models.BooleanField): Whether to allow defer running the task to a
pulpcore_worker. Both `immediate` and `deferred` cannot both be `False`.
Defaults to `True`.
Relations:
Expand Down Expand Up @@ -126,6 +134,9 @@ class Task(BaseModel, AutoAddObjPermsMixin):

profile_artifacts = models.ManyToManyField("Artifact", through=ProfileArtifact)

immediate = models.BooleanField(default=False, null=True)
deferred = models.BooleanField(default=True, null=True)

def __str__(self):
return "Task: {name} [{state}]".format(name=self.name, state=self.state)

Expand Down
1 change: 0 additions & 1 deletion pulpcore/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
TASK_METRICS_HEARTBEAT_LOCK = 74
STORAGE_METRICS_LOCK = 72


#: All valid task states.
TASK_STATES = SimpleNamespace(
WAITING="waiting",
Expand Down
1 change: 1 addition & 0 deletions pulpcore/tasking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def dispatch(
parent_task=Task.current(),
reserved_resources_record=resources,
versions=versions,
immediate=immediate,
)
if newest_created and task.pulp_created <= newest_created:
# Let this workaround not row forever into the future.
Expand Down
44 changes: 29 additions & 15 deletions pulpcore/tasking/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ def beat(self):
self.worker_cleanup()
with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(TASK_SCHEDULING_LOCK):
dispatch_scheduled_tasks()
# This "reporting code" must not me moved inside a task, because it is supposed
# to be able to report on a congested tasking system to produce reliable results.
self.record_unblocked_waiting_tasks_metric()

def notify_workers(self):
Expand Down Expand Up @@ -234,8 +236,11 @@ def is_compatible(self, task):
return False
return True

def identify_unblocked_tasks(self):
"""Iterate over waiting tasks and mark them unblocked accordingly."""
def unblock_tasks(self):
"""Iterate over waiting tasks and mark them unblocked accordingly.
Returns `True` if at least one task was unblocked. `False` otherwise.
"""

changed = False
taken_exclusive_resources = set()
Expand Down Expand Up @@ -296,33 +301,33 @@ def identify_unblocked_tasks(self):

def iter_tasks(self):
"""Iterate over ready tasks and yield each task while holding the lock."""

while not self.shutdown_requested:
# When batching this query, be sure to use "pulp_created" as a cursor
for task in Task.objects.filter(
state__in=TASK_INCOMPLETE_STATES, unblocked_at__isnull=False
).order_by("pulp_created"):
state__in=TASK_INCOMPLETE_STATES,
unblocked_at__isnull=False,
).order_by("-immediate", "pulp_created"):
# This code will only be called if we acquired the lock successfully
# The lock will be automatically be released at the end of the block
with contextlib.suppress(AdvisoryLockError), task:
# This code will only be called if we acquired the lock successfully
# The lock will be automatically be released at the end of the block
# Check if someone else changed the task before we got the lock
task.refresh_from_db()

if task.state == TASK_STATES.CANCELING and task.worker is None:
# No worker picked this task up before being canceled
if self.cancel_abandoned_task(task, TASK_STATES.CANCELED):
# Continue looking for the next task
# without considering this tasks resources
# as we just released them
# Continue looking for the next task without considering this
# tasks resources, as we just released them
continue
if task.state in [TASK_STATES.RUNNING, TASK_STATES.CANCELING]:
# A running task without a lock must be abandoned
if self.cancel_abandoned_task(
task, TASK_STATES.FAILED, "Worker has gone missing."
):
# Continue looking for the next task
# without considering this tasks resources
# as we just released them
# Continue looking for the next task without considering this
# tasks resources, as we just released them
continue

# This statement is using lazy evaluation
if (
task.state == TASK_STATES.WAITING
Expand All @@ -333,6 +338,7 @@ def iter_tasks(self):
# Start from the top of the Task list
break
else:
# No task found in the for-loop
break

def sleep(self):
Expand Down Expand Up @@ -399,7 +405,7 @@ def supervise_task(self, task):
with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(
TASK_UNBLOCKING_LOCK
):
self.identify_unblocked_tasks()
self.unblock_tasks()
self.wakeup = False
if task_process.sentinel in r:
if not task_process.is_alive():
Expand All @@ -422,6 +428,7 @@ def supervise_task(self, task):
)
cancel_state = TASK_STATES.FAILED
cancel_reason = "Aborted during worker shutdown."

task_process.join()
if not cancel_state and task_process.exitcode != 0:
_logger.warning(
Expand All @@ -445,11 +452,16 @@ def supervise_task(self, task):
self.task = None

def handle_available_tasks(self):
"""Handle available tasks in a monitor/supervise cycle.
The cycle must spin until there are no more available tasks. Any flaw in detecting this
can lead to stale tasks in the database.
"""
keep_looping = True
while keep_looping and not self.shutdown_requested:
try:
with PGAdvisoryLock(TASK_UNBLOCKING_LOCK):
keep_looping = self.identify_unblocked_tasks()
keep_looping = self.unblock_tasks()
except AdvisoryLockError:
keep_looping = True
for task in self.iter_tasks():
Expand Down Expand Up @@ -509,11 +521,13 @@ def run(self, burst=False):
else:
self.cursor.execute("LISTEN pulp_worker_wakeup")
while not self.shutdown_requested:
# do work
if self.shutdown_requested:
break
self.handle_available_tasks()
if self.shutdown_requested:
break
# rest until notified to wakeup
self.sleep()
self.cursor.execute("UNLISTEN pulp_worker_wakeup")
self.cursor.execute("UNLISTEN pulp_worker_metrics_heartbeat")
Expand Down

0 comments on commit 697f97a

Please sign in to comment.