diff --git a/integration_tests.py b/integration_tests.py index 6f1130134e7..94f076c0794 100755 --- a/integration_tests.py +++ b/integration_tests.py @@ -139,22 +139,24 @@ def create( prepare_environment(flags, editable, extra_module, release_var) install_server() install_client() - error = 0 + exit_code = 0 if run_server_tests: try: test_server() except TestExit as e: - error += e.exit_code + exit_code += e.exit_code else: raise NotImplementedError() if run_client_tests: try: test_client() except TestExit as e: - error += e.exit_code + exit_code += e.exit_code else: raise NotImplementedError() - raise typer.Exit(0) + if exit_code != 0: + typer.secho("One or more tests failed", err=True, fg=c.RED) + raise typer.Exit(exit_code) @app.command() diff --git a/release.notes b/release.notes index 4d17c37ca0b..824d148ec5b 100644 --- a/release.notes +++ b/release.notes @@ -28,6 +28,13 @@ CHANGE: (#4937) removed StatesMonitoringAgent (use StatesAccountingAgent agent i *tests CHANGE: (#5046) don't use mail in the self generated certificates +[v7r2p5] + +FIX: fixes from v7r0p56, v7r1p39 + +*WMS +CHANGE: (#5102) JobCleaningAgent will first DELETE and only then REMOVE jobs + [v7r2p4] *Core @@ -199,6 +206,11 @@ NEW: (#4910) --runslow option on unit tests to allow faster local tests NEW: (#4938) added a helloworld test for the (yet to be implemented) cloud testing in certification CHANGE: (#4968) Change the defaults for tests (to MySQL 8 and ES 7) +[v7r1p39] + +*WMS +CHANGE: (#5121) for HTCondor, the SiteDirectory write the executable in the globally defined working directory + [v7r1p38] FIX: fixes from v7r0p55 @@ -828,6 +840,14 @@ FIX: (#4551) align ProxyDB test to current changes NEW: (#4289) Document how to run integration tests in docker NEW: (#4551) add DNProperties description to Registry/Users subsection +[v7r0p56] + +*Resources +FIX: (#5119) HTCondorCE: Limit calls to actual cleanup (find and delete files on disk) to + once per minute per SiteDirector, fixes #5118 +CHANGE: (#5119) HTCondorCE cleanup: Run the DIRAC_ executable purge with -O3 and -maxdepth + 1 to speed up the find + [v7r0p55] *TS diff --git a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py index d355709cb2a..529eb2caa13 100644 --- a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py +++ b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py @@ -62,7 +62,9 @@ except ImportError: # Python 3's subprocess module contains a compatibility layer import subprocess as commands +import datetime import errno +import threading from DIRAC import S_OK, S_ERROR, gConfig from DIRAC.Resources.Computing.ComputingElement import ComputingElement @@ -164,6 +166,10 @@ class HTCondorCEComputingElement(ComputingElement): implementing the functions jobSubmit, getJobOutput """ + # static variables to ensure single cleanup every minute + _lastCleanupTime = datetime.datetime.utcnow() + _cleanupLock = threading.Lock() + ############################################################################# def __init__(self, ceUniqueID): """ Standard constructor. @@ -529,21 +535,33 @@ def __cleanup(self): # FIXME: again some issue with the working directory... # workingDirectory = self.ceParameters.get( 'WorkingDirectory', DEFAULT_WORKINGDIRECTORY ) + if not HTCondorCEComputingElement._cleanupLock.acquire(False): + return + + now = datetime.datetime.utcnow() + if (now - HTCondorCEComputingElement._lastCleanupTime).total_seconds() < 60: + HTCondorCEComputingElement._cleanupLock.release() + return + + HTCondorCEComputingElement._lastCleanupTime = now + self.log.debug("Cleaning working directory: %s" % self.workingDirectory) # remove all files older than 120 minutes starting with DIRAC_ Condor will # push files on submission, but it takes at least a few seconds until this # happens so we can't directly unlink after condor_submit - status, stdout = commands.getstatusoutput('find %s -mmin +120 -name "DIRAC_*" -delete ' % self.workingDirectory) + status, stdout = commands.getstatusoutput('find -O3 %s -maxdepth 1 -mmin +120 -name "DIRAC_*" -delete ' % + self.workingDirectory) if status: self.log.error("Failure during HTCondorCE __cleanup", stdout) - # remove all out/err/log files older than "DaysToKeepLogs" days in the CE part of the working Directory - workDir = os.path.join(self.workingDirectory, self.ceName) - findPars = dict(workDir=workDir, days=self.daysToKeepLogs) + # remove all out/err/log files older than "DaysToKeepLogs" days in the working directory + # not running this for each CE so we do global cleanup + findPars = dict(workDir=self.workingDirectory, days=self.daysToKeepLogs) # remove all out/err/log files older than "DaysToKeepLogs" days status, stdout = commands.getstatusoutput( r'find %(workDir)s -mtime +%(days)s -type f \( -name "*.out" -o -name "*.err" -o -name "*.log" \) -delete ' % findPars) if status: self.log.error("Failure during HTCondorCE __cleanup", stdout) + self._cleanupLock.release() diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py index eda8666f113..ac350ab47c8 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py @@ -33,7 +33,7 @@ import os -from DIRAC import S_OK +from DIRAC import S_OK, S_ERROR from DIRAC.Core.Base.AgentModule import AgentModule from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations from DIRAC.RequestManagementSystem.Client.Request import Request @@ -42,11 +42,9 @@ from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient from DIRAC.WorkloadManagementSystem.Client import JobStatus from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB -from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB -from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB from DIRAC.WorkloadManagementSystem.Client.SandboxStoreClient import SandboxStoreClient from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient -from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient +from DIRAC.WorkloadManagementSystem.Client.WMSClient import WMSClient import DIRAC.Core.Utilities.Time as Time @@ -63,15 +61,9 @@ def __init__(self, *args, **kwargs): # clients self.jobDB = None - self.taskQueueDB = None - self.jobLoggingDB = None self.maxJobsAtOnce = 100 - self.jobByJob = False - self.throttlingPeriod = 0. - self.prodTypes = [] - self.removeStatusDelay = {} self.removeStatusDelayHB = {} @@ -80,11 +72,8 @@ def initialize(self): """ Sets defaults """ - self.am_setOption("PollingTime", 120) self.jobDB = JobDB() - self.taskQueueDB = TaskQueueDB() - self.jobLoggingDB = JobLoggingDB() - # self.sandboxDB = SandboxDB( 'SandboxDB' ) + agentTSTypes = self.am_getOption('ProductionTypes', []) if agentTSTypes: self.prodTypes = agentTSTypes @@ -121,24 +110,29 @@ def _getAllowedJobTypes(self): return S_OK(cleanJobTypes) def execute(self): - """ Remove jobs in various status + """ Remove or delete jobs in various status """ - # Delete jobs in "Deleted" state + + # TODO: check the WMS SM before calling the functions below (v7r3) + + # First, fully remove jobs in JobStatus.DELETED state result = self.removeJobsByStatus({'Status': JobStatus.DELETED}) if not result['OK']: - return result + self.log.error('Failed to remove jobs with status %s' % JobStatus.DELETED) - # Get all the Job types that can be cleaned + # Second: set the status to JobStatus.DELETED for certain jobs + + # Get all the Job types for which we can set the status to JobStatus.DELETED result = self._getAllowedJobTypes() if not result['OK']: return result - # No jobs in the system subject to removal + # No jobs in the system subject to deletion if not result['Value']: return S_OK() baseCond = {'JobType': result['Value']} - # Remove jobs with final status + # Delete jobs with final status for status in self.removeStatusDelay: delay = self.removeStatusDelay[status] if delay < 0: @@ -148,9 +142,9 @@ def execute(self): if status != 'Any': condDict['Status'] = status delTime = str(Time.dateTime() - delay * Time.day) - result = self.removeJobsByStatus(condDict, delTime) + result = self.deleteJobsByStatus(condDict, delTime) if not result['OK']: - self.log.warn('Failed to remove jobs in status %s' % status) + self.log.error('Failed to delete jobs', 'with condDict %s' % condDict) if self.maxHBJobsAtOnce > 0: for status, delay in self.removeStatusDelayHB.items(): @@ -160,43 +154,78 @@ def execute(self): return S_OK() def removeJobsByStatus(self, condDict, delay=False): - """ Remove deleted jobs - """ - if delay: - self.log.verbose("Removing jobs with %s and older than %s day(s)" % (condDict, delay)) - result = self.jobDB.selectJobs(condDict, older=delay, limit=self.maxJobsAtOnce) - else: - self.log.verbose("Removing jobs with %s " % condDict) - result = self.jobDB.selectJobs(condDict, limit=self.maxJobsAtOnce) + """ Fully remove jobs that are already in status "DELETED", unless there are still requests. - if not result['OK']: - return result + :param dict condDict: a dict like {'JobType': 'User', 'Status': 'Killed'} + :param int delay: days of delay + :returns: S_OK/S_ERROR + """ - jobList = [int(jID) for jID in result['Value']] - if len(jobList) > self.maxJobsAtOnce: - jobList = jobList[:self.maxJobsAtOnce] + res = self._getJobsList(condDict, delay) + if not res['OK']: + return res + jobList = res['Value'] if not jobList: return S_OK() - self.log.notice("Attempting to delete jobs", "(%d for %s)" % (len(jobList), condDict)) + self.log.notice("Attempting to remove jobs", "(%d for %s)" % (len(jobList), condDict)) # remove from jobList those that have still Operations to do in RMS res = ReqClient().getRequestIDsForJobs(jobList) if not res['OK']: return res if res['Value']['Successful']: - self.log.warn("Some jobs won't be removed, as still having Requests to complete", + self.log.info("Some jobs won't be removed, as still having Requests to complete", "(n=%d)" % len(res['Value']['Successful'])) jobList = list(set(jobList).difference(set(res['Value']['Successful']))) if not jobList: return S_OK() + ownerJobsDict = self._getOwnerJobsDict(jobList) + + fail = False + for owner, jobsList in ownerJobsDict.items(): + ownerDN = owner.split(';')[0] + ownerGroup = owner.split(';')[1] + self.log.verbose( + "Attempting to remove jobs", + "(n=%d) for %s : %s" % (len(jobsList), ownerDN, ownerGroup)) + wmsClient = WMSClient(useCertificates=True, delegatedDN=ownerDN, delegatedGroup=ownerGroup) + result = wmsClient.removeJob(jobsList) + if not result['OK']: + self.log.error( + "Could not remove jobs", + "for %s : %s (n=%d) : %s" % (ownerDN, ownerGroup, len(jobsList), result['Message'])) + fail = True + + if fail: + return S_ERROR() + + return S_OK() + + def deleteJobsByStatus(self, condDict, delay=False): + """ Sets the job status to "DELETED" for jobs in condDict. + + :param dict condDict: a dict like {'JobType': 'User', 'Status': 'Killed'} + :param int delay: days of delay + :returns: S_OK/S_ERROR + """ + + res = self._getJobsList(condDict, delay) + if not res['OK']: + return res + jobList = res['Value'] + if not jobList: + return S_OK() + + self.log.notice("Attempting to delete jobs", "(%d for %s)" % (len(jobList), condDict)) + result = SandboxStoreClient(useCertificates=True).unassignJobs(jobList) if not result['OK']: self.log.error("Cannot unassign jobs to sandboxes", result['Message']) return result - result = self.deleteJobOversizedSandbox(jobList) + result = self.deleteJobOversizedSandbox(jobList) # This might set a request if not result['OK']: self.log.error( "Cannot schedule removal of oversized sandboxes", result['Message']) @@ -208,15 +237,73 @@ def removeJobsByStatus(self, condDict, delay=False): if not jobList: return S_OK() - result = JobManagerClient().removeJob(jobList) + ownerJobsDict = self._getOwnerJobsDict(jobList) + + fail = False + for owner, jobsList in ownerJobsDict.items(): + ownerDN = owner.split(';')[0] + ownerGroup = owner.split(';')[1] + self.log.verbose( + "Attempting to delete jobs", + "(n=%d) for %s : %s" % (len(jobsList), ownerDN, ownerGroup)) + wmsClient = WMSClient(useCertificates=True, delegatedDN=ownerDN, delegatedGroup=ownerGroup) + result = wmsClient.deleteJob(jobsList) + if not result['OK']: + self.log.error( + "Could not delete jobs", + "for %s : %s (n=%d) : %s" % (ownerDN, ownerGroup, len(jobsList), result['Message'])) + fail = True + + if fail: + return S_ERROR() + + return S_OK() + + def _getJobsList(self, condDict, delay=False): + """ Get jobs list according to conditions + + :param dict condDict: a dict like {'JobType': 'User', 'Status': 'Killed'} + :param int delay: days of delay + :returns: S_OK with jobsList + """ + if delay: + self.log.verbose("Get jobs with %s and older than %s day(s)" % (condDict, delay)) + result = self.jobDB.selectJobs(condDict, older=delay, limit=self.maxJobsAtOnce) + else: + self.log.info("Get jobs with %s " % condDict) + result = self.jobDB.selectJobs(condDict, limit=self.maxJobsAtOnce) + if not result['OK']: - self.log.error("Could not remove jobs", result['Message']) return result - return S_OK() + jobList = [int(jID) for jID in result['Value']] + if len(jobList) > self.maxJobsAtOnce: + jobList = jobList[:self.maxJobsAtOnce] + return S_OK(jobList) + + def _getOwnerJobsDict(self, jobList): + """ + gets in input a list of int(JobID) and return a dict with a grouping of them by owner, e.g. + {'dn;group': [1, 3, 4], 'dn;group_1': [5], 'dn_1;group': [2]} + """ + res = self.jobDB.getJobsAttributes(jobList, ['OwnerDN', 'OwnerGroup']) + if not res['OK']: + self.log.error("Could not get the jobs attributes", res['Message']) + return res + jobsDictAttribs = res['Value'] + + ownerJobsDict = {} + for jobID, jobDict in jobsDictAttribs.items(): + ownerJobsDict.setdefault(';'.join(jobDict.values()), []).append(jobID) + return ownerJobsDict def deleteJobOversizedSandbox(self, jobIDList): - """ Delete the job oversized sandbox files from storage elements + """ + Deletes the job oversized sandbox files from storage elements. + Creates a request in RMS if not immediately possible. + + :param list jobIDList: list of job IDs + :returns: S_OK/S_ERROR """ failed = {} diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py index 6f40099dcbe..35f10f92945 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py @@ -1027,7 +1027,17 @@ def getExecutable(self, queue, proxy=None, jobExecDir='', envVariables=None, pilotOptions = [] pilotOptions = ' '.join(pilotOptions) self.log.verbose('pilotOptions: %s' % pilotOptions) - executable = self._writePilotScript(workingDirectory=self.workingDirectory, + + # if a global workingDirectory is defined for the CEType (like HTCondor) + # use it (otherwise the __cleanup done by HTCondor will be in the wrong folder !) + # Note that this means that if you run multiple HTCondorCE + # in your machine, the executable files will be in the same place + # but it does not matter since they are very temporary + + ce = self.queueCECache[queue]['CE'] + workingDirectory = getattr(ce, 'workingDirectory', self.workingDirectory) + + executable = self._writePilotScript(workingDirectory=workingDirectory, pilotOptions=pilotOptions, proxy=proxy, pilotExecDir=jobExecDir, diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py index 659140ac3f0..1bc8fb241c7 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobCleaningAgent.py @@ -41,9 +41,7 @@ def test__getAllowedJobTypes(mocker, mockReplyInput, expected): mocker.patch( "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.getDistinctJobAttributes", side_effect=mockReply) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.TaskQueueDB.__init__", side_effect=mockNone) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.__init__", side_effect=mockNone) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobLoggingDB.__init__", side_effect=mockNone) jobCleaningAgent = JobCleaningAgent() jobCleaningAgent.log = gLogger @@ -76,9 +74,7 @@ def test_removeJobsByStatus(mocker, conditions, mockReplyInput, expected): side_effect=lambda x, y=None: y, create=True ) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.selectJobs", side_effect=mockReply) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.TaskQueueDB.__init__", side_effect=mockNone) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.__init__", side_effect=mockNone) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobLoggingDB.__init__", side_effect=mockNone) jobCleaningAgent = JobCleaningAgent() jobCleaningAgent.log = gLogger @@ -91,6 +87,39 @@ def test_removeJobsByStatus(mocker, conditions, mockReplyInput, expected): assert result == expected +@pytest.mark.parametrize( + "conditions, mockReplyInput, expected", [ + ({'JobType': '', 'Status': 'Deleted'}, {'OK': True, 'Value': ''}, {'OK': True, 'Value': None}), + ({'JobType': '', 'Status': 'Deleted'}, {'OK': False, 'Message': ''}, {'OK': False, 'Message': ''}), + ({'JobType': [], 'Status': 'Deleted'}, {'OK': True, 'Value': ''}, {'OK': True, 'Value': None}), + ({'JobType': ['some', 'status'], + 'Status': ['Deleted', 'Cancelled']}, {'OK': True, 'Value': ''}, {'OK': True, 'Value': None}) + ]) +def test_deleteJobsByStatus(mocker, conditions, mockReplyInput, expected): + """ Testing JobCleaningAgent().deleteJobsByStatus() + """ + + mockReply.return_value = mockReplyInput + + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.__init__") + mocker.patch( + "DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule._AgentModule__moduleProperties", + side_effect=lambda x, y=None: y, create=True + ) + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.selectJobs", side_effect=mockReply) + mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB.__init__", side_effect=mockNone) + + jobCleaningAgent = JobCleaningAgent() + jobCleaningAgent.log = gLogger + jobCleaningAgent.log.setLevel('DEBUG') + jobCleaningAgent._AgentModule__configDefaults = mockAM + jobCleaningAgent.initialize() + + result = jobCleaningAgent.deleteJobsByStatus(conditions) + + assert result == expected + + @pytest.mark.parametrize( "inputs, params, expected", [ ([], {'OK': True, 'Value': {}}, {'OK': True, 'Value': {'Failed': {}, 'Successful': {}}}), @@ -111,9 +140,7 @@ def test_deleteJobOversizedSandbox(mocker, inputs, params, expected): mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.__init__") mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule.am_getOption", return_value=mockAM) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.TaskQueueDB", return_value=mockNone) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobDB", return_value=mockNone) - mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobLoggingDB", return_value=mockNone) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.ReqClient", return_value=mockNone) mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.JobMonitoringClient", return_value=mockJMC) diff --git a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py index 10984faacff..23ea418936d 100644 --- a/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py +++ b/src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py @@ -170,6 +170,14 @@ def test__submitPilotsToQueue(mocker): 'OwnerGroup': ['lhcb_user'], 'Setup': 'LHCb-Production', 'Site': 'LCG.CERN.cern'}}} + + # Create a MagicMock that does not have the workingDirectory + # attribute (https://cpython-test-docs.readthedocs.io/en/latest/library/unittest.mock.html#deleting-attributes) + # This is to use the SiteDirector's working directory, not the CE one + ceMock = MagicMock() + del ceMock.workingDirectory + + sd.queueCECache = {'aQueue': {'CE': ceMock}} sd.queueSlots = {'aQueue': {'AvailableSlots': 10}} res = sd._submitPilotsToQueue(1, MagicMock(), 'aQueue') assert res['OK'] is True diff --git a/src/DIRAC/WorkloadManagementSystem/Client/WMSClient.py b/src/DIRAC/WorkloadManagementSystem/Client/WMSClient.py index f11ceededa1..3cf5988e383 100755 --- a/src/DIRAC/WorkloadManagementSystem/Client/WMSClient.py +++ b/src/DIRAC/WorkloadManagementSystem/Client/WMSClient.py @@ -28,6 +28,7 @@ class WMSClient(object): submit kill delete + remove reschedule reset """ @@ -213,11 +214,17 @@ def killJob(self, jobID): return self.jobManager.killJob(jobID) def deleteJob(self, jobID): - """ Delete job(s) from the WMS Job database. + """ Delete job(s) (set their status to DELETED) from the WMS Job database. jobID can be an integer representing a single DIRAC job ID or a list of IDs """ return self.jobManager.deleteJob(jobID) + def removeJob(self, jobID): + """ Fully remove job(s) from the WMS Job database. + jobID can be an integer representing a single DIRAC job ID or a list of IDs + """ + return self.jobManager.removeJob(jobID) + def rescheduleJob(self, jobID): """ Reschedule job(s) in WMS Job database. jobID can be an integer representing a single DIRAC job ID or a list of IDs diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py index 2e852cb54c3..2fd4441d4b5 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobDB.py @@ -271,11 +271,56 @@ def getAtticJobParameters(self, jobID, paramList=None, rescheduleCounter=-1): else: return S_ERROR('JobDB.getAtticJobParameters: failed to retrieve parameters') +############################################################################# + # TODO: the following 3 methods can be merged into 1. + + def getJobsAttributes(self, jobIDs, attrList=None): + """ Get all Job(s) Attributes for a given list of jobIDs. + Return a dictionary with all Job Attributes as value pairs + """ + + # If no list is given, return all attributes + if not attrList: + attrList = self.jobAttributeNames + if isinstance(attrList, six.string_types): + attrList = attrList.replace(' ', '').split(',') + attrList.sort() + + if isinstance(jobIDs, six.string_types): + jobIDs = jobIDs.replace(' ', '').split(',') + if isinstance(jobIDs, int): + jobIDs = [jobIDs] + + attrNameListS = [] + for x in attrList: + ret = self._escapeString(x) + if not ret['OK']: + return ret + x = "`" + ret['Value'][1:-1] + "`" + attrNameListS.append(x) + attrNames = 'JobID,' + ','.join(attrNameListS) + + cmd = 'SELECT %s FROM Jobs WHERE JobID IN (%s)' % ( + attrNames, ','.join(str(jobID) for jobID in jobIDs)) + res = self._query(cmd) + if not res['OK']: + return res + if not res['Value']: + return S_OK({}) + + attributes = {} + for t_att in res['Value']: + jobID = int(t_att[0]) + attributes.setdefault(jobID, {}) + for tx, ax in zip(t_att[1:], attrList): + attributes[jobID].setdefault(ax, tx) + + return S_OK(attributes) + ############################################################################# def getJobAttributes(self, jobID, attrList=None): """ Get all Job Attributes for a given jobID. - Return a dictionary with all Job Attributes, - return an empty dictionary if matching job found + Return a dictionary with all Job Attributes as value pairs """ ret = self._escapeString(jobID) @@ -1163,6 +1208,9 @@ def removeJobFromDB(self, jobIDs): # return ret # e_jobID = ret['Value'] + if not jobIDs: + return S_OK() + if not isinstance(jobIDs, list): jobIDList = [jobIDs] else: diff --git a/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py b/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py index 8636fb46fa6..3de23787d0f 100755 --- a/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py +++ b/src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py @@ -120,6 +120,8 @@ def getJobLoggingInfo(self, jobID): def deleteJob(self, jobID): """ Delete logging records for given jobs """ + if not jobID: + return S_OK() # Make sure that we have a list of strings of jobIDs if isinstance(jobID, six.integer_types): diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py index 2231d8b5b53..eced66fd5aa 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobManagerHandler.py @@ -367,31 +367,32 @@ def export_removeJob(self, jobIDs): count = 0 error_count = 0 - self.log.verbose("Removing jobs", "(n=%d)" % len(validJobList)) - result = self.jobDB.removeJobFromDB(validJobList) - if not result['OK']: - self.log.error("Failed to remove jobs from JobDB", "(n=%d)" % len(validJobList)) - else: - self.log.info("Removed jobs from JobDB", "(n=%d)" % len(validJobList)) - - for jobID in validJobList: - resultTQ = self.taskQueueDB.deleteJob(jobID) - if not resultTQ['OK']: - self.log.warn("Failed to remove job from TaskQueueDB", - "(%d): %s" % (jobID, resultTQ['Message'])) - error_count += 1 + if validJobList: + self.log.verbose("Removing jobs", "(n=%d)" % len(validJobList)) + result = self.jobDB.removeJobFromDB(validJobList) + if not result['OK']: + self.log.error("Failed to remove jobs from JobDB", "(n=%d)" % len(validJobList)) else: - count += 1 + self.log.info("Removed jobs from JobDB", "(n=%d)" % len(validJobList)) + + for jobID in validJobList: + resultTQ = self.taskQueueDB.deleteJob(jobID) + if not resultTQ['OK']: + self.log.warn("Failed to remove job from TaskQueueDB", + "(%d): %s" % (jobID, resultTQ['Message'])) + error_count += 1 + else: + count += 1 - result = self.jobLoggingDB.deleteJob(validJobList) - if not result['OK']: - self.log.error("Failed to remove jobs from JobLoggingDB", "(n=%d)" % len(validJobList)) - else: - self.log.info("Removed jobs from JobLoggingDB", "(n=%d)" % len(validJobList)) + result = self.jobLoggingDB.deleteJob(validJobList) + if not result['OK']: + self.log.error("Failed to remove jobs from JobLoggingDB", "(n=%d)" % len(validJobList)) + else: + self.log.info("Removed jobs from JobLoggingDB", "(n=%d)" % len(validJobList)) - if count > 0 or error_count > 0: - self.log.info("Removed jobs from DB", - "(%d jobs with %d errors)" % (count, error_count)) + if count > 0 or error_count > 0: + self.log.info("Removed jobs from DB", + "(%d jobs with %d errors)" % (count, error_count)) if invalidJobList or nonauthJobList: self.log.error( @@ -480,7 +481,7 @@ def __killJob(self, jobID, sendKillCommand=True): return S_OK() def __kill_delete_jobs(self, jobIDList, right): - """ Kill or delete jobs as necessary + """ Kill (== set the status to "KILLED") or delete (== set the status to "DELETED") jobs as necessary :param list jobIDList: job IDs :param str right: right @@ -493,47 +494,49 @@ def __kill_delete_jobs(self, jobIDList, right): validJobList, invalidJobList, nonauthJobList, ownerJobList = self.jobPolicy.evaluateJobRights(jobList, right) - # Get job status to see what is to be killed or deleted - result = self.jobDB.getAttributesForJobList(validJobList, ['Status']) - if not result['OK']: - return result - killJobList = [] - deleteJobList = [] - markKilledJobList = [] - stagingJobList = [] - for jobID, sDict in result['Value'].items(): # can be an iterator - if sDict['Status'] in (JobStatus.RUNNING, JobStatus.MATCHED, JobStatus.STALLED): - killJobList.append(jobID) - elif sDict['Status'] in (JobStatus.DONE, JobStatus.FAILED, JobStatus.KILLED): - if not right == RIGHT_KILL: - deleteJobList.append(jobID) - else: - markKilledJobList.append(jobID) - if sDict['Status'] in [JobStatus.STAGING]: - stagingJobList.append(jobID) - badIDs = [] - for jobID in markKilledJobList: - result = self.__killJob(jobID, sendKillCommand=False) - if not result['OK']: - badIDs.append(jobID) - for jobID in killJobList: - result = self.__killJob(jobID) + if validJobList: + # Get job status to see what is to be killed or deleted + result = self.jobDB.getAttributesForJobList(validJobList, ['Status']) if not result['OK']: - badIDs.append(jobID) + return result + killJobList = [] + deleteJobList = [] + markKilledJobList = [] + stagingJobList = [] + for jobID, sDict in result['Value'].items(): # can be an iterator + if sDict['Status'] in (JobStatus.RUNNING, JobStatus.MATCHED, JobStatus.STALLED): + killJobList.append(jobID) + elif sDict['Status'] in (JobStatus.DONE, JobStatus.FAILED, JobStatus.KILLED): + if not right == RIGHT_KILL: + deleteJobList.append(jobID) + else: + markKilledJobList.append(jobID) + if sDict['Status'] in [JobStatus.STAGING]: + stagingJobList.append(jobID) - for jobID in deleteJobList: - result = self.__deleteJob(jobID) - if not result['OK']: - badIDs.append(jobID) + for jobID in markKilledJobList: + result = self.__killJob(jobID, sendKillCommand=False) + if not result['OK']: + badIDs.append(jobID) - if stagingJobList: - stagerClient = StorageManagerClient() - self.log.info('Going to send killing signal to stager as well!') - result = stagerClient.killTasksBySourceTaskID(stagingJobList) - if not result['OK']: - self.log.warn('Failed to kill some Stager tasks', result['Message']) + for jobID in killJobList: + result = self.__killJob(jobID) + if not result['OK']: + badIDs.append(jobID) + + for jobID in deleteJobList: + result = self.__deleteJob(jobID) + if not result['OK']: + badIDs.append(jobID) + + if stagingJobList: + stagerClient = StorageManagerClient() + self.log.info('Going to send killing signal to stager as well!') + result = stagerClient.killTasksBySourceTaskID(stagingJobList) + if not result['OK']: + self.log.warn('Failed to kill some Stager tasks', result['Message']) if nonauthJobList or badIDs: result = S_ERROR('Some jobs failed deletion') diff --git a/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py b/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py index edf3b662063..06b35984748 100755 --- a/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py +++ b/src/DIRAC/WorkloadManagementSystem/Service/JobPolicy.py @@ -79,8 +79,7 @@ def getUserRightsForJob(self, jobID, owner=None, group=None): else: return S_ERROR('Job not found') - result = self.getJobPolicy(owner, group) - return result + return self.getJobPolicy(owner, group) def __getUserJobPolicy(self): """ Get the job rights for the primary user for which the JobPolicy object diff --git a/tests/Integration/WorkloadManagementSystem/Test_JobDB.py b/tests/Integration/WorkloadManagementSystem/Test_JobDB.py index 4b007ac854a..c2a605a1588 100644 --- a/tests/Integration/WorkloadManagementSystem/Test_JobDB.py +++ b/tests/Integration/WorkloadManagementSystem/Test_JobDB.py @@ -6,7 +6,8 @@ # pylint: disable=wrong-import-position -from __future__ import print_function, absolute_import +from __future__ import print_function +from __future__ import absolute_import from __future__ import division from datetime import datetime, timedelta @@ -17,6 +18,9 @@ parseCommandLine() from DIRAC import gLogger +from DIRAC.WorkloadManagementSystem.Client import JobStatus + +# sut from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB jdl = """[ @@ -63,17 +67,33 @@ def test_insertAndRemoveJobIntoDB(): res = jobDB.insertNewJobIntoDB(jdl, 'owner', '/DN/OF/owner', 'ownerGroup', 'someSetup') assert res['OK'] is True, res['Message'] - jobID = res['JobID'] + jobID = int(res['JobID']) res = jobDB.getJobAttribute(jobID, 'Status') assert res['OK'] is True, res['Message'] - assert res['Value'] == 'Received' + assert res['Value'] == JobStatus.RECEIVED res = jobDB.getJobAttribute(jobID, 'MinorStatus') assert res['OK'] is True, res['Message'] assert res['Value'] == 'Job accepted' + res = jobDB.getJobAttributes(jobID, ['Status', 'MinorStatus']) + assert res['OK'] is True, res['Message'] + assert res['Value'] == {'Status': JobStatus.RECEIVED, 'MinorStatus': 'Job accepted'} + res = jobDB.getJobsAttributes(jobID, ['Status', 'MinorStatus']) + assert res['OK'] is True, res['Message'] + assert res['Value'] == {jobID: {'Status': JobStatus.RECEIVED, 'MinorStatus': 'Job accepted'}} res = jobDB.getJobOptParameters(jobID) assert res['OK'] is True, res['Message'] assert res['Value'] == {} + res = jobDB.insertNewJobIntoDB(jdl, 'owner', '/DN/OF/owner', 'ownerGroup', 'someSetup') + assert res['OK'] is True, res['Message'] + jobID_2 = int(res['JobID']) + + res = jobDB.getJobsAttributes([jobID, jobID_2], ['Status', 'MinorStatus']) + assert res['OK'] is True, res['Message'] + assert res['Value'] == { + jobID: {'Status': JobStatus.RECEIVED, 'MinorStatus': 'Job accepted'}, + jobID_2: {'Status': JobStatus.RECEIVED, 'MinorStatus': 'Job accepted'}} + res = jobDB.selectJobs({}) assert res['OK'] is True, res['Message'] jobs = res['Value'] @@ -93,7 +113,7 @@ def test_rescheduleJob(): res = jobDB.getJobAttribute(jobID, 'Status') assert res['OK'] is True, res['Message'] - assert res['Value'] == 'Received' + assert res['Value'] == JobStatus.RECEIVED res = jobDB.getJobAttribute(jobID, 'MinorStatus') assert res['OK'] is True, res['Message'] assert res['Value'] == 'Job Rescheduled' @@ -136,12 +156,12 @@ def test_heartBeatLogging(): else: assert False, 'Unknown entry: %s: %s' % (name, value) - res = jobDB.setJobStatus(jobID, status='Done') + res = jobDB.setJobStatus(jobID, status=JobStatus.DONE) assert res['OK'] is True, res['Message'] tomorrow = datetime.today() + timedelta(1) delTime = datetime.strftime(tomorrow, '%Y-%m-%d') - res = jobDB.removeInfoFromHeartBeatLogging(status='Done', delTime=delTime, maxLines=100) + res = jobDB.removeInfoFromHeartBeatLogging(status=JobStatus.DONE, delTime=delTime, maxLines=100) assert res['OK'] is True, res['Message'] res = jobDB.getHeartBeatData(jobID)