Skip to content

Commit

Permalink
feat (TS): RequestTasks use getBulkRequestStatus
Browse files Browse the repository at this point in the history
  • Loading branch information
chaen committed Aug 16, 2024
1 parent 7e33023 commit 7af5cfc
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def export_getBulkRequestStatus(cls, requestIDs):
"""get requests statuses given their ids"""
res = cls.__requestDB.getBulkRequestStatus(requestIDs)
if not res["OK"]:
gLogger.error(f"getRequestStatus: {res['Message']}")
gLogger.error("getRequestStatus", res["Message"])
return res

types_getRequestFileStatus = [int, [str, list]]
Expand Down
80 changes: 56 additions & 24 deletions src/DIRAC/TransformationSystem/Client/RequestTasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,26 +317,49 @@ def getSubmittedTaskStatus(self, taskDicts):
Check if tasks changed status, and return a list of tasks per new status
"""
updateDict = {}
badRequestID = 0
externalIDs = [
int(taskDict["ExternalID"])
for taskDict in taskDicts
if taskDict["ExternalID"] and int(taskDict["ExternalID"])
]

# Count how many tasks don't have an valid external ID
badRequestID = len(taskDicts) - len(externalIDs)

res = self.requestClient.getBulkRequestStatus(externalIDs)
if not res["OK"]:
# We need a transformationID for the log, and although we expect a single one,
# do things ~ properly
tids = list({taskDict["TransformationID"] for taskDict in taskDicts})
try:
tid = tids[0]
except IndexError:
tid = 0

self._logWarn(
"getSubmittedTaskStatus: Failed to get bulk requestIDs",
res["Message"],
transID=tid,
)
return S_OK({})
new_statuses = res["Value"]

for taskDict in taskDicts:
oldStatus = taskDict["ExternalStatus"]
# ExternalID is normally a string
if taskDict["ExternalID"] and int(taskDict["ExternalID"]):
newStatus = self.requestClient.getRequestStatus(taskDict["ExternalID"])
if not newStatus["OK"]:
log = self._logVerbose if "not exist" in newStatus["Message"] else self._logWarn
log(
"getSubmittedTaskStatus: Failed to get requestID for request",
newStatus["Message"],
transID=taskDict["TransformationID"],
)
else:
newStatus = newStatus["Value"]
# We don't care updating the tasks to Assigned while the request is being processed
if newStatus != oldStatus and newStatus != "Assigned":
updateDict.setdefault(newStatus, []).append(taskDict["TaskID"])

newStatus = new_statuses.get(taskDict["ExternalID"])
if not newStatus:
self._logVerbose(
"getSubmittedTaskStatus: Failed to get requestID for request",
f"No such RequestID {taskDict['ExternalID']}",
transID=taskDict["TransformationID"],
)
else:
badRequestID += 1
# We do not update the tasks status if the Request is Assigned, as it is a very temporary status
if newStatus != oldStatus and newStatus != "Assigned":
updateDict.setdefault(newStatus, []).append(taskDict["TaskID"])

if badRequestID:
self._logWarn("%d requests have identifier 0" % badRequestID)
return S_OK(updateDict)
Expand All @@ -363,26 +386,35 @@ def getSubmittedFileStatus(self, fileDicts):
requestFiles = {}
for taskDict in res["Value"]:
taskID = taskDict["TaskID"]
externalID = taskDict["ExternalID"]
externalID = int(taskDict["ExternalID"])
# Only consider tasks that are submitted, ExternalID is a string
if taskDict["ExternalStatus"] != "Created" and externalID and int(externalID):
requestFiles[externalID] = taskFiles[taskID]

res = self.requestClient.getBulkRequestStatus(list(requestFiles))
if not res["OK"]:
self._logWarn(
"Failed to get request status",
res["Message"],
transID=transID,
method="getSubmittedFileStatus",
)
return S_OK({})
reqStatuses = res["Value"]

updateDict = {}
for requestID, lfnList in requestFiles.items():
# We only take request in final state to avoid race conditions
# https://github.com/DIRACGrid/DIRAC/issues/7116#issuecomment-2188740414
reqStatus = self.requestClient.getRequestStatus(requestID)
if not reqStatus["OK"]:
log = self._logVerbose if "not exist" in reqStatus["Message"] else self._logWarn
log(
reqStatus = reqStatuses.get(requestID)
if not reqStatus:
self._logVerbose(
"Failed to get request status",
reqStatus["Message"],
f"Request {requestID} does not exist",
transID=transID,
method="getSubmittedFileStatus",
)
continue
reqStatus = reqStatus["Value"]
if reqStatus not in Request.FINAL_STATES:
continue

Expand All @@ -398,7 +430,7 @@ def getSubmittedFileStatus(self, fileDicts):
continue

# If we are here, it means the Request is in a final state.
# In principle, you could expect everyfile also be in a final state
# In principle, you could expect every file also be in a final state
# but this is only true for simple Request.
# Hence, the file is marked as PROCESSED only if the file status is Done
# In any other case, we mark it problematic
Expand Down

0 comments on commit 7af5cfc

Please sign in to comment.