Skip to content

Commit

Permalink
Priority mgmt
Browse files Browse the repository at this point in the history
Signed-off-by: 0ssigeno <[email protected]>
  • Loading branch information
0ssigeno committed Apr 4, 2024
1 parent 2816ad5 commit b8f59bb
Show file tree
Hide file tree
Showing 13 changed files with 133 additions and 14 deletions.
8 changes: 5 additions & 3 deletions api_app/ingestors_manager/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
def pre_save_ingestor_config(sender, instance: IngestorConfig, *args, **kwargs):
from intel_owl.tasks import execute_ingestor

instance.user = User.objects.get_or_create(
username=f"{instance.name.title()}Ingestor"
)[0]
user = User.objects.get_or_create(username=f"{instance.name.title()}Ingestor")[0]
user.profile.task_priority = 7
user.profile.is_robot = True
user.profile.save()
instance.user = user

periodic_task = PeriodicTask.objects.update_or_create(
name=f"{instance.name.title()}Ingestor",
Expand Down
7 changes: 7 additions & 0 deletions api_app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ def retry(self):
runner.apply_async(
queue=get_queue_name(settings.CONFIG_QUEUE),
MessageGroupId=str(uuid.uuid4()),
priority=self.priority,
)

def set_final_status(self) -> None:
Expand Down Expand Up @@ -540,8 +541,13 @@ def _final_status_signature(self) -> Signature:
queue=get_queue_name(settings.CONFIG_QUEUE),
immutable=True,
MessageGroupId=str(uuid.uuid4()),
priority=self.priority,
)

@property
def priority(self):
return self.user.profile.task_priority

def _get_pipeline(
self,
analyzers: PythonConfigQuerySet,
Expand Down Expand Up @@ -1199,6 +1205,7 @@ def _signature_pipeline_status(cls, job, status: str) -> Signature:
queue=get_queue_name(settings.CONFIG_QUEUE),
immutable=True,
MessageGroupId=str(uuid.uuid4()),
priority=job.priority,
)

@property
Expand Down
1 change: 1 addition & 0 deletions api_app/queryset.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ def get_signatures(self, job) -> Generator[Signature, None, None]:
task_id=task_id,
immutable=True,
MessageGroupId=str(task_id),
priority=job.priority,
)


Expand Down
1 change: 1 addition & 0 deletions api_app/serializers/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ def create(self, validated_data: Dict) -> Job:
args=[job.pk],
queue=get_queue_name(settings.DEFAULT_QUEUE),
MessageGroupId=str(uuid.uuid4()),
priority=job.priority,
)

return job
Expand Down
1 change: 1 addition & 0 deletions api_app/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,7 @@ def perform_retry(report: AbstractReport):
queue=report.config.queue,
immutable=True,
MessageGroupId=str(uuid.uuid4()),
priority=report.job.priority,
)
runner()

Expand Down
4 changes: 4 additions & 0 deletions authentication/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ class UserProfileAdmin(admin.ModelAdmin):
"company_name",
"company_role",
"discover_from",
"task_priority",
"is_robot",
)
list_filter = ["task_priority", "is_robot"]


@admin.display(boolean=True)
def user_is_active(self, obj: UserProfile) -> bool:
Expand Down
4 changes: 4 additions & 0 deletions authentication/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@

class ApiAppAuthConfig(AppConfig):
name = "authentication"

@staticmethod
def ready() -> None:
from . import signals # noqa
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Generated by Django 4.2.11 on 2024-04-04 08:16

from django.conf import settings
import django.core.validators
from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('authentication', '0002_migrate_from_durin'),
]

operations = [
migrations.AddField(
model_name='userprofile',
name='is_robot',
field=models.BooleanField(default=False),
),
migrations.AddField(
model_name='userprofile',
name='task_priority',
field=models.IntegerField(default=10, validators=[django.core.validators.MaxValueValidator(10), django.core.validators.MinValueValidator(1)]),
),
migrations.AlterField(
model_name='userprofile',
name='user',
field=models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, related_name='profile', to=settings.AUTH_USER_MODEL),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Generated by Django 4.2.11 on 2024-04-04 08:27

import django.core.validators
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('authentication', '0003_userprofile_is_robot_userprofile_task_priority_and_more'),
]

operations = [
migrations.AlterField(
model_name='userprofile',
name='company_name',
field=models.CharField(max_length=32, null=True, validators=[django.core.validators.MinLengthValidator(3)]),
),
migrations.AlterField(
model_name='userprofile',
name='company_role',
field=models.CharField(max_length=32, null=True, validators=[django.core.validators.MinLengthValidator(3)]),
),
]
28 changes: 28 additions & 0 deletions authentication/migrations/0005_create_profiles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Generated by Django 4.2.11 on 2024-04-04 07:46

from django.conf import settings
from django.db import migrations


def migrate(apps, schema_editor):
User = apps.get_model(*settings.AUTH_USER_MODEL.split("."))
Profile = apps.get_model("authentication", "UserProfile")
for user in User.objects.all():
is_robot = user.username.endswith("Ingestor")
Profile.objects.create(
user=user, task_priority=7 if is_robot else 10, is_robot=is_robot
)


def reverse_migrate(apps, schema_editor):
Profile = apps.get_model("authentication", "UserProfile")
Profile.objects.all().delete()


class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
("authentication", "0004_alter_userprofile_company_name_and_more"),
]

operations = [migrations.RunPython(migrate, reverse_migrate)]
15 changes: 9 additions & 6 deletions authentication/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# See the file 'LICENSE' for copying permission.

from django.conf import settings
from django.core.validators import MinLengthValidator
from django.core.validators import MinLengthValidator, MaxValueValidator, MinValueValidator
from django.db import models

__all__ = [
Expand All @@ -24,17 +24,17 @@ class DiscoverFromChoices(models.TextChoices):


class UserProfile(models.Model):
# contants
# constants
DiscoverFromChoices = DiscoverFromChoices

# fields
user = models.OneToOneField(
settings.AUTH_USER_MODEL,
on_delete=models.CASCADE,
related_name="user_profile",
related_name="profile",
)
company_name = models.CharField(max_length=32, validators=[MinLengthValidator(3)])
company_role = models.CharField(max_length=32, validators=[MinLengthValidator(3)])
company_name = models.CharField(max_length=32, validators=[MinLengthValidator(3)], null=True)
company_role = models.CharField(max_length=32, validators=[MinLengthValidator(3)], null=True)
twitter_handle = models.CharField(
max_length=16, default="", blank=True, validators=[MinLengthValidator(3)]
)
Expand All @@ -43,7 +43,10 @@ class UserProfile(models.Model):
choices=DiscoverFromChoices.choices,
default=DiscoverFromChoices.OTHER,
)

task_priority = models.IntegerField(
default=10, validators=[MaxValueValidator(10), MinValueValidator(1)]
)
is_robot = models.BooleanField(default=False)
# meta
class Meta:
verbose_name_plural = "User Profiles"
11 changes: 11 additions & 0 deletions authentication/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from django.conf import settings
from django.db.models.signals import post_save
from django.dispatch import receiver

from authentication.models import UserProfile


@receiver(post_save, sender=settings.AUTH_USER_MODEL)
def post_save_user(sender, instance, created, **kwargs):
if created:
UserProfile.objects.create(user=instance)
11 changes: 6 additions & 5 deletions intel_owl/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,14 @@ def get_queue_name(queue: str) -> str:
BROKER_TRANSPORT_OPTIONS["access_key_id"] = settings.AWS_ACCESS_KEY_ID
BROKER_TRANSPORT_OPTIONS["secret_access_key"] = settings.AWS_SECRET_ACCESS_KEY
else:
BROKER_TRANSPORT_OPTIONS = {}
BROKER_TRANSPORT_OPTIONS = {
"priority_steps": list(range(10)),
"sep": ":",
"queue_order_strategy": "priority",
}

task_queues = [
Queue(
get_queue_name(key),
routing_key=key,
)
Queue(get_queue_name(key), routing_key=key, queue_arguments={"x-max-priority": 10})
for key in settings.CELERY_QUEUES
]
if not settings.AWS_SQS:
Expand Down

0 comments on commit b8f59bb

Please sign in to comment.