Skip to content

Commit

Permalink
feat (TS): further speedups
Browse files Browse the repository at this point in the history
  • Loading branch information
chaen committed Sep 23, 2024
1 parent 7610e2d commit 3652097
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/DIRAC/DataManagementSystem/Utilities/ResolveSE.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
""" This module allows to resolve output SEs for Job based
on SE and site/country association
"""

from random import shuffle

from DIRAC import gLogger, gConfig
Expand Down Expand Up @@ -70,7 +71,6 @@ def getDestinationSEList(outputSE, site, outputmode="Any"):
raise RuntimeError(localSEs["Message"])
localSEs = localSEs["Value"]
sLog.verbose("Local SE list is:", ", ".join(localSEs))

# There is an alias defined for this Site
associatedSEs = gConfig.getValue(f"/Resources/Sites/{prefix}/{site}/AssociatedSEs/{outputSE}", [])
if associatedSEs:
Expand Down
9 changes: 6 additions & 3 deletions src/DIRAC/TransformationSystem/Agent/TransformationAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
:dedent: 2
:caption: TransformationAgent options
"""

import time
import os
import datetime
Expand Down Expand Up @@ -241,7 +242,7 @@ def processTransformation(self, transDict, clients):
if transID not in self.replicaCache:
self.__readCache(transID)
transFiles = transFiles["Value"]
unusedLfns = [f["LFN"] for f in transFiles]
unusedLfns = {f["LFN"] for f in transFiles}
unusedFiles = len(unusedLfns)

plugin = transDict.get("Plugin", "Standard")
Expand All @@ -250,7 +251,7 @@ def processTransformation(self, transDict, clients):
maxFiles = Operations().getValue(f"TransformationPlugins/{plugin}/MaxFilesToProcess", 0)
# Get plugin-specific limit in number of files (0 means no limit)
totLfns = len(unusedLfns)
lfnsToProcess = self.__applyReduction(unusedLfns, maxFiles=maxFiles)
lfnsToProcess = set(self.__applyReduction(unusedLfns, maxFiles=maxFiles))
if len(lfnsToProcess) != totLfns:
self._logInfo(
"Reduced number of files from %d to %d" % (totLfns, len(lfnsToProcess)),
Expand Down Expand Up @@ -533,8 +534,10 @@ def _getDataReplicasDM(self, transID, lfns, clients, forJobs=True, ignoreMissing
method=method,
transID=transID,
)
successful_set = set(replicas["Successful"])
failed_set = set(replicas["Failed"])
# If files are neither Successful nor Failed, they are set problematic in the FC
problematicLfns = [lfn for lfn in lfns if lfn not in replicas["Successful"] and lfn not in replicas["Failed"]]
problematicLfns = [lfn for lfn in lfns if lfn not in successful_set and lfn not in failed_set]
if problematicLfns:
self._logInfo(f"{len(problematicLfns)} files found problematic in the catalog, set ProbInFC")
res = clients["TransformationClient"].setFileStatusForTransformation(
Expand Down

0 comments on commit 3652097

Please sign in to comment.