Skip to content

Commit

Permalink
Define immediate tasks and prioritize them
Browse files Browse the repository at this point in the history
This defines tasks of the type "immediate" by adding a Task.immediate(bool)
field to the Task model. Non-immediate are called "long".

Then, these type of task is put in front of the line when the worker
queries for new available tasks.

Closes #5767
  • Loading branch information
pedro-psb committed Sep 27, 2024
1 parent a2645c9 commit 9ab7302
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 6 deletions.
3 changes: 3 additions & 0 deletions CHANGES/5767.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added a formal "immediate" type of Task and changed workers behavior to prioritize those.
This labeling is exlusive to plugin code and should only be applied where it's known that
the task will finish shortly, like in updates of repositories, remotes, and distributions.
18 changes: 18 additions & 0 deletions pulpcore/app/migrations/0124_task_immediate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.16 on 2024-09-27 19:05

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("core", "0123_upstreampulp_q_select"),
]

operations = [
migrations.AddField(
model_name="task",
name="immediate",
field=models.BooleanField(default=False),
),
]
1 change: 1 addition & 0 deletions pulpcore/app/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class Task(BaseModel, AutoAddObjPermsMixin):
versions = HStoreField(default=dict)

profile_artifacts = models.ManyToManyField("Artifact", through=ProfileArtifact)
immediate = models.BooleanField(default=False)

def __str__(self):
return "Task: {name} [{state}]".format(name=self.name, state=self.state)
Expand Down
6 changes: 6 additions & 0 deletions pulpcore/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
STORAGE_METRICS_LOCK = 72


#: All valid task types.
TASK_TYPES = SimpleNamespace(
LONG="long",
IMMEDIATE="immediate",
)

#: All valid task states.
TASK_STATES = SimpleNamespace(
WAITING="waiting",
Expand Down
27 changes: 21 additions & 6 deletions pulpcore/tasking/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import signal
import socket
import contextlib
import itertools
from datetime import datetime, timedelta
from multiprocessing import Process
from tempfile import TemporaryDirectory
Expand All @@ -24,6 +25,7 @@
TASK_SCHEDULING_LOCK,
TASK_UNBLOCKING_LOCK,
TASK_METRICS_HEARTBEAT_LOCK,
TASK_TYPES,
)
from pulpcore.app.apps import pulp_plugin_configs
from pulpcore.app.models import Worker, Task, ApiAppStatus, ContentAppStatus
Expand Down Expand Up @@ -297,7 +299,7 @@ def unblock_tasks(self):

return changed

def iter_tasks(self):
def iter_tasks(self, task_type=None):
"""Iterate over ready tasks and yield each task while holding the lock.
Args:
Expand All @@ -307,10 +309,21 @@ def iter_tasks(self):

def available_tasks():
# When batching any of these queries, be sure to use "pulp_created" as a cursor
return Task.objects.filter(
state__in=TASK_INCOMPLETE_STATES,
unblocked_at__isnull=False,
).order_by("pulp_created")
if task_type is TASK_TYPES.LONG:
return Task.objects.filter(
state__in=TASK_INCOMPLETE_STATES, unblocked_at__isnull=False, immediate=False
).order_by("pulp_created")

if task_type is TASK_TYPES.IMMEDIATE:
return Task.objects.filter(
state__in=TASK_STATES.WAITING, unblocked_at__isnull=False, immediate=True
).order_by("pulp_created")

if task_type is None: # Take all
return Task.objects.filter(
state__in=TASK_INCOMPLETE_STATES,
unblocked_at__isnull=False,
).order_by("pulp_created")

def abandon_incomplete_tasks(task, ignore=False):
"""Abandon tasks in incomplete states that meet certain criteria.
Expand Down Expand Up @@ -468,7 +481,9 @@ def handle_available_tasks(self):
with contextlib.suppress(AdvisoryLockError), PGAdvisoryLock(TASK_UNBLOCKING_LOCK):
stop_loop = self.unblock_tasks() is False
# supervise available tasks
for task in self.iter_tasks():
for task in itertools.chain(
self.iter_tasks(TASK_TYPES.IMMEDIATE), self.iter_tasks(TASK_TYPES.LONG)
):
self.supervise_task(task)

def record_unblocked_waiting_tasks_metric(self):
Expand Down

0 comments on commit 9ab7302

Please sign in to comment.