6 TaskManager is a tool for keeping track of JobRunner jobs using a 
    7 database. TaskManager uses the notion of a task as the primary unit for 
    8 bookkeeping. Each task includes one or more jobs run using the same 
    9 JobRunner template and typically resulting from a single invocation of 
   10 JobRunner through one of the makeDPD.py or runDPD.py 
   11 scripts. TaskManager keeps track of the status of tasks and associated 
   12 jobs, runs postprocessing scripts, and accumulates results for later 
   13 analysis accross tasks. 
   15 Written by Juerg Beringer (LBNL) in 2009. 
   17 __author__  = 
'Juerg Beringer' 
   18 __version__ = 
'TaskManager.py atlas/athena' 
   22 from InDetBeamSpotExample.Utils 
import getRunFromName
 
   23 from InDetBeamSpotExample.Utils 
import getUserName
 
   27     """Task manager exception class.""" 
   30     """Task manager exception because of a database error""" 
   34     """Convert a tuple from a database query into a dictonary.""" 
   36     for idx, col 
in enumerate(cursor.description):
 
   42     """Get the key for which dictonary d has an entry with value v. 
   43     Returns 'Undefined' if there's no such key, if several values are found.""" 
   44     l = [k 
for k 
in d.keys() 
if d[k]==v]
 
   52     """Returns 'ok', 'warn' or 'bad' depending on the value of status.""" 
   58     if status 
in statusMap:
 
   59         return statusMap[status]
 
   67     if v 
not in s.split():
 
   72 def getFullTaskNames(taskman,dsname,taskname,requireSingleTask=False,confirmWithUser=False,addWildCards=True):
 
   73     """Retrieve the full dataset and task names given a pair of (dsname,task) that may 
   74        contain wildcards or be just a parital name such as the run number. Depending 
   75        on the requireSingleTask and confirmWithUser settings a TaskManagerCheckError 
   76        is raised if there are multiple tasks or if the user doesn't confirm.""" 
   77     taskList = taskman.getTaskNames(dsname,taskname,addWildCards)
 
   79         raise  TaskManagerCheckError (
'ERROR: No tasks found for dataset = %s, task = %s' % (dsname,taskname))
 
   80     if requireSingleTask 
and len(taskList)!=1:
 
   81         m = 
"ERROR: Multiple data set names found for dataset = %s, task = %s\n       Please use full dataset or task name from list below, using option -n if necessary:\n\n" % (dsname,taskname)
 
   82         m += 
"    %-50s  %s\n" % (
'DATASET NAME',
'TASK NAME')
 
   83         m += 
"    %s\n" % (75*
'-')
 
   85             m += 
"    %-50s  %s\n" % (t[0],t[1])
 
   87         raise TaskManagerCheckError (m)
 
   89         print (
'Please confirm that you want to execute this command for the following tasks:\n')
 
   90         print (
"    %-50s  %s" % (
'DATASET NAME',
'TASK NAME'))
 
   91         print (
"    %s" % (75*
'-'))
 
   93             print (
"    %-50s  %s" % (t[0],t[1]))
 
   94         a = input(
'\nARE YOU SURE [n] ? ')
 
   96             raise TaskManagerCheckError (
'ERROR: Aborted by user')
 
  102     """Read config dict from job files.""" 
  104     configFile = glob.glob(
'%s/%s/%s/%s/%s' % (jobDir,dsName,taskName,jobName,
'*.config.py.final.py'))
 
  106         configFile = glob.glob(
'%s/%s/%s/%s/%s' % (jobDir,dsName,taskName,jobName,
'*.config.py'))
 
  109     return config[
'jobConfig']
 
  113     """A utility class to hold a single parameter to be used in SQL queries.""" 
  117         return '%r' % self.
value 
  121     """TaskManager is tool for keeping track of JobRunner jobs.""" 
  123     StatusCodes = { 
'UNKNOWN': -1,
 
  131                     'POSTPROCRUNNING': 6,
 
  136     OnDiskCodes = { 
'UNKNOWN': -1,
 
  146             connstring = os.environ.get(
'TASKDB', 
'sqlite_file:taskdata.db')
 
  149             dbtype, dbname = connstring.split(
':', 1)
 
  151             raise ValueError (
'Illegal database connection string {}'.
format(connstring))
 
  153         if dbtype == 
'auth_file':
 
  157                 with open(authfile) 
as af:
 
  158                     connstring = af.read().strip()
 
  159                 dbtype, dbname = connstring.split(
':', 1)
 
  161                 raise ValueError (
'Invalid authorization file {} (not readable or invalid format)'.
format(authfile))
 
  163         return dbtype, dbname
 
  166     def __init__(self, connstring='', createDatabase=False):
 
  167         """Constructor. connstring is a connection string specifying either a SQLite file 
  168            ("sqlite_file:..."), an Oracle database ("oracle://..."), or an authorization file 
  169            ("auth_file:...") with connection information. If connstring is empty, the connection 
  170            information will be taken from TASKDB (if set), or otherwise defaults to 
  171            'sqlite_file:taskdata.db'.""" 
  182             dbexists = os.access(self.
dbname, os.F_OK)
 
  183             if dbexists 
and createDatabase:
 
  184                 raise ValueError (
'SQLite file {} exists already - remove before recreating'.
format(self.
dbname))
 
  185             if not (dbexists 
or createDatabase):
 
  186                 raise ValueError (
'TaskManager database not found (SQLite file {})'.
format(self.
dbname))
 
  190         elif self.
dbtype == 
'oracle':
 
  196                 print (
'ERROR: First connection attempt to Beam Spot Oracle database failed; will retry in 10s ...')
 
  202             raise ValueError (
'Unknown database type: {}'.
format(self.
dbtype))
 
  205         ''' Remember that we're inside a 'with' statement so we can warn otherwise: ''' 
  209     def __exit__(self, exc_type, exc_value, traceback):
 
  210         ''' Close the database connection at the end of the 'with' statement. ''' 
  214             print (
'ERROR: Unable to close database connection')
 
  217         """Create the database schema for a SQLite3 database.""" 
  220                 TASKID integer primary key autoincrement, 
  225                 TASKPOSTPROCSTEPS text, 
  235                 NRESULTFILES integer default 0, 
  236                 NJOBS integer default 0, 
  237                 NJOBS_SUBMITTED integer default -1, 
  238                 NJOBS_RUNNING integer default -1, 
  239                 NJOBS_POSTPROC integer default -1, 
  240                 NJOBS_FAILED integer default -1, 
  241                 NJOBS_COMPLETED integer default -1, 
  245                 STATUS_POSTPROC integer, 
  254         """Create the database schema for an Oracle database.""" 
  259         except Exception 
as e:
 
  264                 DSNAME varchar2(256), 
  266                 TASKNAME varchar2(256), 
  267                 TEMPLATE varchar2(256), 
  268                 TASKPOSTPROCSTEPS varchar2(256), 
  269                 ATLREL varchar2(256), 
  271                 CREATED_USER varchar2(256), 
  272                 CREATED_HOST varchar2(256), 
  276                 LOGFILES varchar2(256), 
  277                 COOLTAGS varchar2(256), 
  278                 NRESULTFILES number(10,0) default 0, 
  279                 NJOBS number(10,0) default 0, 
  280                 NJOBS_SUBMITTED number(10,0) default -1, 
  281                 NJOBS_RUNNING number(10,0) default -1, 
  282                 NJOBS_POSTPROC number(10,0) default -1, 
  283                 NJOBS_FAILED number(10,0) default -1, 
  284                 NJOBS_COMPLETED number(10,0) default -1, 
  285                 RESULTFILES varchar2(4000), 
  286                 RESULTLINKS varchar2(4000), 
  287                 TASKCOMMENT varchar2(4000), 
  288                 STATUS_POSTPROC number(5,0) default -1, 
  289                 BASEPATH varchar2(256), 
  290                 AUXDATA varchar2(4000), 
  291                 constraint BEAMSPOT_TASKS_PK primary key (TASKID) 
  296         self.
dbcon.
cursor().
execute(
'alter table TASKS add constraint TASKS_TNAME_DSNAME_UQ  unique (TASKNAME, DSNAME)')
 
  297         self.
dbcon.
cursor().
execute(
'create sequence TASKS_SEQ start with 1 increment by 1 nomaxvalue')
 
  301 create or replace trigger TASKS_ID_TRIGGER 
  302 before insert on TASKS 
  305    select TASKS_SEQ.nextval into :new.TASKID from dual; 
  311     def execute(self,statementParts, doCommit=False, limit=None):
 
  312         """Execute a SQL statement passed as a list or tuple of statement parts, where each 
  313            part is either a partial SQL string, or an object of type DbParam specifying a parameter 
  314            for the SQL statement. The actual SQL statement is assembled (using the parameter style 
  315            of the current database) and executed, and the resulting cursor is returned. 
  316            Loosely follows the method discussed in the Python Cookbook. 
  317            WARNING: At present, limit doesn't work when ordering rows for Oracle!""" 
  319             print (
'**WARN**  TaskManager will keep the database connection open until it is deleted.')
 
  320             print (
'**INFO**  TaskManager should generally only be used inside a with statement:')
 
  321             print (
'**INFO**      with TaskManager(...) as taskman:')
 
  322             print (
'**INFO**        # do something ...')
 
  324         if not statementParts:
 
  326         if isinstance(statementParts,str):
 
  327             raise TypeError (
'Must pass list or tuple to TaskManager.execute')
 
  328         for p 
in statementParts:
 
  329             if not (isinstance(p,DbParam) 
or isinstance(p,str)):
 
  330                 raise ValueError (
'Can only pass SQL string fragments and DbParam objects in list' 
  331                                    'to TaskManager.execute, found %s with "%s"' % (
type(p),
str(p)))
 
  338             for p 
in statementParts:
 
  339                 if isinstance(p,DbParam):
 
  341                     params.append(p.value)
 
  348             for p 
in statementParts:
 
  349                 if isinstance(p,DbParam):
 
  350                     name = 
'p%d' % len(params)
 
  351                     sqlParts.append(
':%s' % name)
 
  352                     params[name] = p.value
 
  357             raise ValueError (
'Unknown SQL parameter style %s' % self.
paramstyle)
 
  359         sql = 
' '.
join(sqlParts)
 
  365                 if 'order by' in sql:   
 
  366                     sqlParts2 = sql.split(
'order by')
 
  367                     sql = 
'%s and ROWNUM <= %i order by %s' % (sqlParts2[0],limit,sqlParts2[1])
 
  369                     sql += 
' and ROWNUM <= %i' % limit
 
  371                 sql += 
' limit %i' % limit
 
  374             print (
'\nExecuting SQL statement:  ',sql)
 
  375             print (
'        with parameters:  ',params)
 
  379             cursor.execute(sql,params)
 
  382         except Exception 
as e:
 
  383             msg = 
'\nDatabase error executing SQL statement\n  %s\nusing parameters\n  %s\n%s' % (sql,params,e)
 
  384             raise TaskManagerDatabaseError (msg)
 
  389         """Delete a task entry from the taskmanager database.""" 
  390         q = [ 
'delete from TASKS where DSNAME =',
DbParam(dsName),
'and TASKNAME =',
DbParam(taskName) ]
 
  392         return cursor.rowcount
 
  395     def addTask(self,dsName,taskName,template,release,njobs,
 
  396                 taskpostprocsteps='',
 
  397                 status=StatusCodes['SUBMITTED'],
 
  398                 onDisk=OnDiskCodes['ALLONDISK'],
 
  403         """Add an entry for a new task if the task doesn't exist already. If the task exists, 
  404            its UPDATED, NJOBS, STATUS and ONDISK fields will be updated.""" 
  407         updateStatus = self.
getNTasks([
'where DSNAME = ',
DbParam(dsName),
'and TASKNAME=',
DbParam(taskName),
'and STATUS < %i' % TaskManager.StatusCodes[
'POSTPROCRUNNING']])
 
  412             if task[
'TEMPLATE']!=template:
 
  413                 print (
'ERROR: Must not update task with different template: DSNAME = %s, TASKNAME = %s, templates = %s vs %s' % (dsName,taskName,task[
'TEMPLATE'],template))
 
  415                 updateStr = [
'update TASKS set UPDATED =',
DbParam(tstamp),
 
  416                              ', NJOBS = ',
DbParam(task[
'NJOBS']+njobs),
 
  419                 if release 
not in task[
'ATLREL']:
 
  420                     print (
'WARNING: Updating task using different release: DSNAME = %s, TASKNAME = %s, release = = %s vs %s' % (dsName,taskName,task[
'ATLREL'],release))
 
  421                     release = 
'; '.
join([task[
'ATLREL'],release])
 
  422                     updateStr += [
', ATLREL = ',
DbParam(release)]
 
  425                     updateStr += [
', STATUS = ',
DbParam(status)]
 
  427                 updateStr += [
'where DSNAME = ',
DbParam(dsName),
'and TASKNAME = ',
DbParam(taskName)]
 
  438                 createdHost = os.uname()[1]
 
  440             self.
execute( [
'insert into TASKS (DSNAME,RUNNR,TASKNAME,TEMPLATE,TASKPOSTPROCSTEPS,ATLREL,CREATED,CREATED_USER,CREATED_HOST,UPDATED,STATUS,ONDISK,NJOBS,TASKCOMMENT) values (',
 
  445                            DbParam(taskpostprocsteps),
',',
 
  459         """Get status of a task.""" 
  460         cursor = self.
execute( [
'select STATUS from TASKS where DSNAME =', 
DbParam(dsName), 
'and TASKNAME =', 
DbParam(taskName) ] )
 
  461         return cursor.fetchone()[0]
 
  464     def setStatus(self,dsName,taskName,status,oldStatus=None):
 
  465         """Set the status of a task. If oldStatus is set, the task must be in the designated status.""" 
  469                                     'where DSNAME =', 
DbParam(dsName), 
'and TASKNAME =', 
DbParam(taskName),
'and STATUS =',
DbParam(oldStatus)], 
True)
 
  472                                     'where DSNAME =', 
DbParam(dsName), 
'and TASKNAME =', 
DbParam(taskName)], 
True)
 
  473         return cursor.rowcount
 
  477         """Update the onDisk-status of a task.""" 
  480                                 'where DSNAME =', 
DbParam(dsName), 
'and TASKNAME =', 
DbParam(taskName)], 
True)
 
  481         return cursor.rowcount
 
  485                      status,nResultFiles,nJobs,nJobsSubmitted,nJobsRunning,nJobsPostProc,nJobsFailed,nJobsCompleted):
 
  486         """Update task status information including number of jobs in different states.""" 
  488         cursor = self.
execute( [
'update TASKS set UPDATED =',
DbParam(tstamp),
 
  490                                 ', NRESULTFILES =',
DbParam(nResultFiles),
 
  492                                 ', NJOBS_SUBMITTED =',
DbParam(nJobsSubmitted),
 
  493                                 ', NJOBS_RUNNING =',
DbParam(nJobsRunning),
 
  494                                 ', NJOBS_POSTPROC =',
DbParam(nJobsPostProc),
 
  495                                 ', NJOBS_FAILED =',
DbParam(nJobsFailed),
 
  496                                 ', NJOBS_COMPLETED =',
DbParam(nJobsCompleted),
 
  497                                 'where DSNAME =', 
DbParam(dsName), 
'and TASKNAME =', 
DbParam(taskName)], 
True)
 
  498         return cursor.rowcount
 
  501     def setValue(self,dsName,taskName,fieldName,value):
 
  502         """Update a field of a given task.""" 
  504         cursor = self.
execute( [
'update TASKS set UPDATED =',
DbParam(tstamp),
', %s =' % fieldName,
DbParam(value),
 
  505                                 'where DSNAME =', 
DbParam(dsName), 
'and TASKNAME =', 
DbParam(taskName)], 
True)
 
  506         return cursor.rowcount
 
  510         """Get a single value from the task database. If the query results in more than one value, only 
  511            the first value is returned.""" 
  512         q = [ 
'select %s from TASKS' % what ]
 
  514         return self.
execute(q).fetchone()[0]
 
  520         q = [ 
'select %s from TASKS' % what ]
 
  522         return len(self.
execute(q).fetchall())
 
  526         q = [ 
'select count(*) from TASKS' ]
 
  528         return self.
execute(q).fetchone()[0]
 
  531         """Get a single value from the task database. If the query results in more than one value, only 
  532            the first value is returned.""" 
  533         q = [ 
'select %s from TASKS where DSNAME =' % what , 
DbParam(dsName), 
'and TASKNAME =', 
DbParam(taskName)]
 
  534         return self.
execute(q).fetchone()[0]
 
  536     def taskIter(self, what='*', qual=(
'order by UPDATED',)):
 
  537         q = [ 
'select %s from TASKS' % what ]
 
  540         return iter(cursor.fetchone,
None)
 
  543     def taskIterDict(self, what='*', qual=(
'order by UPDATED',), limit=999999999):
 
  550         q = [ 
'select %s from TASKS' % what ]
 
  555             row = cursor.fetchone()
 
  556             if row 
is None: 
break 
  566             q = [ 
'select %s from TASKS where DSNAME =' % what, 
DbParam(dsname), 
'and TASKNAME =', 
DbParam(taskname) ]
 
  571                 q = [ 
'select %s from TASKS where DSNAME =' % what, 
DbParam(dsname), 
'and TASKNAME = ', 
DbParam(taskname), 
'and ROWNUM <= 1 order by UPDATED' ]
 
  573                 q = [ 
'select %s from TASKS where DSNAME =' % what, 
DbParam(dsname), 
'and TASKNAME = ', 
DbParam(taskname), 
'order by UPDATED limit 1' ]
 
  575         row = cursor.fetchone()
 
  583         """Get all DSNAMEs for a given (partial) data set name dsname.""" 
  585         q = [ 
"select distinct(DSNAME) from TASKS where DSNAME like '%%%s%%' order by DSNAME" % dsname ]
 
  586         return self.
execute(q).fetchall()
 
  590         """Find all matching tasks and return list of full (DSNAME,TASKNAME) pairs. dsname and taskname 
  591            may be any partial dataset and task name, respectively.""" 
  594                 q = [ 
"select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' and TASKNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname,taskname) ]
 
  596                 q = [ 
"select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname) ]
 
  599                 q = [ 
"select DSNAME,TASKNAME from TASKS where DSNAME like '%s' and TASKNAME like '%s' order by DSNAME,TASKNAME" % (dsname,taskname) ]
 
  601                 q = [ 
"select DSNAME,TASKNAME from TASKS where DSNAME like '%s' order by DSNAME,TASKNAME" % (dsname) ]
 
  602         return self.
execute(q).fetchall()
 
  606     """TaskAnalyzer is a class for analyzing the files of a given task.""" 
  612         self.
path = jobDir+
'/'+dsName+
'/'+taskName
 
  614         self.
status = TaskManager.StatusCodes[
'UNKNOWN']
 
  615         self.
onDisk = TaskManager.OnDiskCodes[
'UNKNOWN']
 
  626         """Check if task directories are still on disk.""" 
  627         onDisk = os.path.exists(self.
path)
 
  629             self.
onDisk = TaskManager.OnDiskCodes[
'DELETED']
 
  636             self.
nJobs = len(glob.glob(self.
path+
'/*/*.config.py'))
 
  644             self.
status = TaskManager.StatusCodes[
'UNKNOWN']
 
  646                 self.
status = TaskManager.StatusCodes[
'CONFIGURED']
 
  648                 self.
status = TaskManager.StatusCodes[
'COMPLETED']
 
  651                 self.
status = TaskManager.StatusCodes[
'SUBMITTED']
 
  653                 self.
status = TaskManager.StatusCodes[
'RUNNING']
 
  655                 self.
status = TaskManager.StatusCodes[
'POSTPROCESSING']
 
  658                     self.
status = TaskManager.StatusCodes[
'PARTIALFAILED']
 
  660                     self.
status = TaskManager.StatusCodes[
'FAILED']
 
  673     """JobAnalyzer is a class for analyzing the jobs of a given task.""" 
  679         self.
path = jobDir+
'/'+dsName+
'/'+taskName
 
  685             l = os.listdir(self.
path)
 
  692         status = TaskManager.StatusCodes[
'UNKNOWN']
 
  694         p = self.
path+
'/'+jobName
 
  695         if os.path.exists(p):
 
  696             if glob.glob(p+
'/*.config.py'):
 
  697                 status = TaskManager.StatusCodes[
'CONFIGURED']
 
  698             if glob.glob(p+
'/*.SUBMITTED'):
 
  699                 status = TaskManager.StatusCodes[
'SUBMITTED']
 
  707             if glob.glob(p+
'/*.RUNNING'):
 
  708                 status = TaskManager.StatusCodes[
'RUNNING']
 
  709             if glob.glob(p+
'/*.POSTPROCESSING'):
 
  710                 status = TaskManager.StatusCodes[
'POSTPROCESSING']
 
  711             if glob.glob(p+
'/*.COMPLETED'):
 
  712                 status = TaskManager.StatusCodes[
'COMPLETED']
 
  714                 if len(glob.glob(p+
'/*.exit.0'))==0:
 
  715                     status = TaskManager.StatusCodes[
'FAILED']
 
  717                 exitcode = 
open(glob.glob(p+
'/*.exitstatus.dat')[0]).
read()
 
  720             if len(glob.glob(p+
'/*.exit.*'))>len(glob.glob(p+
'/*.exit.0')):
 
  721                 status = TaskManager.StatusCodes[
'FAILED']
 
  722         return (status,exitcode)
 
  728 if __name__ == 
'__main__':