Skip to content

Commit

Permalink
job_manager --> jobs; split jobs router into multiple jobs router mod…
Browse files Browse the repository at this point in the history
…ules; simplify status routes for jobs router
  • Loading branch information
ryuwd committed Sep 16, 2024
1 parent 5ee5871 commit 6240cda
Show file tree
Hide file tree
Showing 10 changed files with 865 additions and 948 deletions.
191 changes: 75 additions & 116 deletions diracx-db/src/diracx/db/sql/utils/job_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,47 @@
from .. import JobDB, JobLoggingDB, SandboxMetadataDB, TaskQueueDB


async def set_job_statuses(
job_update: dict[int, dict[datetime, JobStatusUpdate]],
config: Config,
job_db: JobDB,
job_logging_db: JobLoggingDB,
task_queue_db: TaskQueueDB,
background_task: BackgroundTasks,
force: bool = False,
):
"""Bulk operation setting status on multiple job IDs, returning a dictionary of job ID to result.
This is done by calling set_job_status for each ID and status dictionary provided within a ForgivingTaskGroup.
"""
async with ForgivingTaskGroup() as tg:
results = [
tg.create_task(
set_job_status(
job_id,
status_dict,
config,
job_db,
job_logging_db,
task_queue_db,
background_task,
force=force,
)
)
for job_id, status_dict in job_update.items()
]

return {job_id: status for job_id, status in zip(job_update.keys(), results)}


async def set_job_status(
job_id: int,
status: dict[datetime, JobStatusUpdate],
config: Config,
job_db: JobDB,
job_logging_db: JobLoggingDB,
task_queue_db: TaskQueueDB,
background_task: BackgroundTasks,
force: bool = False,
) -> SetJobStatusReturn:
"""Set various status fields for job specified by its jobId.
Expand Down Expand Up @@ -118,133 +154,56 @@ async def set_job_status(
if not endTime and newEndTime:
job_data["EndExecTime"] = newEndTime

if job_data:
await job_db.setJobAttributes(job_id, job_data)

for updTime in updateTimes:
sDict = statusDict[updTime]
if not sDict.get("Status"):
sDict["Status"] = "idem"
if not sDict.get("MinorStatus"):
sDict["MinorStatus"] = "idem"
if not sDict.get("ApplicationStatus"):
sDict["ApplicationStatus"] = "idem"
if not sDict.get("Source"):
sDict["Source"] = "Unknown"

await job_logging_db.insert_record(
job_id,
sDict["Status"],
sDict["MinorStatus"],
sDict["ApplicationStatus"],
updTime,
sDict["Source"],
)

return SetJobStatusReturn(**job_data)


class ForgivingTaskGroup(asyncio.TaskGroup):
# Hacky way, check https://stackoverflow.com/questions/75250788/how-to-prevent-python3-11-taskgroup-from-canceling-all-the-tasks
# Basically e're using this because we want to wait for all tasks to finish, even if one of them raises an exception
def _abort(self):
return None
#####################################################################################################
async with asyncio.TaskGroup() as tg:
# delete or kill job, if we transition to DELETED or KILLED state
# TODO
if new_status in [JobStatus.DELETED, JobStatus.KILLED]:
tg.create_task(
_remove_jobs_from_task_queue(
[job_id], config, task_queue_db, background_task
)
)

# TODO: implement StorageManagerClient
# returnValueOrRaise(StorageManagerClient().killTasksBySourceTaskID(job_ids))

async def delete_jobs(
job_ids: list[int],
config: Config,
job_db: JobDB,
job_logging_db: JobLoggingDB,
task_queue_db: TaskQueueDB,
background_task: BackgroundTasks,
):
"""Removing jobs from task queues, send a kill command and set status to DELETED.
tg.create_task(job_db.set_job_command(job_id, "Kill"))

:raises: BaseExceptionGroup[JobNotFound] for every job that was not found.
"""
await _remove_jobs_from_task_queue(job_ids, config, task_queue_db, background_task)
# TODO: implement StorageManagerClient
# returnValueOrRaise(StorageManagerClient().killTasksBySourceTaskID(job_ids))
# Update database tables
if job_data:
tg.create_task(job_db.setJobAttributes(job_id, job_data))

async with ForgivingTaskGroup() as task_group:
for job_id in job_ids:
task_group.create_task(job_db.set_job_command(job_id, "Kill"))
for updTime in updateTimes:
sDict = statusDict[updTime]
if not sDict.get("Status"):
sDict["Status"] = "idem"
if not sDict.get("MinorStatus"):
sDict["MinorStatus"] = "idem"
if not sDict.get("ApplicationStatus"):
sDict["ApplicationStatus"] = "idem"
if not sDict.get("Source"):
sDict["Source"] = "Unknown"

task_group.create_task(
set_job_status(
tg.create_task(
job_logging_db.insert_record(
job_id,
{
datetime.now(timezone.utc): JobStatusUpdate(
Status=JobStatus.DELETED,
MinorStatus="Checking accounting",
Source="job_manager",
)
},
job_db,
job_logging_db,
force=True,
sDict["Status"],
sDict["MinorStatus"],
sDict["ApplicationStatus"],
updTime,
sDict["Source"],
)
)

return SetJobStatusReturn(**job_data)

async def kill_jobs(
job_ids: list[int],
config: Config,
job_db: JobDB,
job_logging_db: JobLoggingDB,
task_queue_db: TaskQueueDB,
background_task: BackgroundTasks,
):
"""Kill jobs by removing them from the task queues, set kill as a job command and setting the job status to KILLED.
:raises: BaseExceptionGroup[JobNotFound] for every job that was not found.
"""
await _remove_jobs_from_task_queue(job_ids, config, task_queue_db, background_task)
# TODO: implement StorageManagerClient
# returnValueOrRaise(StorageManagerClient().killTasksBySourceTaskID(job_ids))

async with ForgivingTaskGroup() as task_group:
for job_id in job_ids:
task_group.create_task(job_db.set_job_command(job_id, "Kill"))
task_group.create_task(
set_job_status(
job_id,
{
datetime.now(timezone.utc): JobStatusUpdate(
Status=JobStatus.KILLED,
MinorStatus="Marked for termination",
Source="job_manager",
)
},
job_db,
job_logging_db,
force=True,
)
)

# TODO: Consider using the code below instead, probably more stable but less performant
# errors = []
# for job_id in job_ids:
# try:
# await job_db.set_job_command(job_id, "Kill")
# await set_job_status(
# job_id,
# {
# datetime.now(timezone.utc): JobStatusUpdate(
# Status=JobStatus.KILLED,
# MinorStatus="Marked for termination",
# Source="job_manager",
# )
# },
# job_db,
# job_logging_db,
# force=True,
# )
# except JobNotFound as e:
# errors.append(e)

# if errors:
# raise BaseExceptionGroup("Some job ids were not found", errors)
class ForgivingTaskGroup(asyncio.TaskGroup):
# Hacky way, check https://stackoverflow.com/questions/75250788/how-to-prevent-python3-11-taskgroup-from-canceling-all-the-tasks
# Basically e're using this because we want to wait for all tasks to finish, even if one of them raises an exception
def _abort(self):
return None


async def remove_jobs(
Expand Down
6 changes: 3 additions & 3 deletions diracx-routers/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ types = [
]

[project.entry-points."diracx.services"]
jobs = "diracx.routers.job_manager:router"
jobs = "diracx.routers.jobs:router"
config = "diracx.routers.configuration:router"
auth = "diracx.routers.auth:router"
".well-known" = "diracx.routers.auth.well_known:router"

[project.entry-points."diracx.access_policies"]
WMSAccessPolicy = "diracx.routers.job_manager.access_policies:WMSAccessPolicy"
SandboxAccessPolicy = "diracx.routers.job_manager.access_policies:SandboxAccessPolicy"
WMSAccessPolicy = "diracx.routers.jobs.access_policies:WMSAccessPolicy"
SandboxAccessPolicy = "diracx.routers.jobs.access_policies:SandboxAccessPolicy"


[tool.setuptools.packages.find]
Expand Down
Loading

0 comments on commit 6240cda

Please sign in to comment.