6 TaskManager is a tool for keeping track of JobRunner jobs using a
7 database. TaskManager uses the notion of a task as the primary unit for
8 bookkeeping. Each task includes one or more jobs run using the same
9 JobRunner template and typically resulting from a single invocation of
10 JobRunner through one of the makeDPD.py or runDPD.py
11 scripts. TaskManager keeps track of the status of tasks and associated
12 jobs, runs postprocessing scripts, and accumulates results for later
13 analysis accross tasks.
15 Written by Juerg Beringer (LBNL) in 2009.
17 __author__ =
'Juerg Beringer'
18 __version__ =
'TaskManager.py atlas/athena'
22 from InDetBeamSpotExample.Utils
import getRunFromName
23 from InDetBeamSpotExample.Utils
import getUserName
29 """Task manager exception class."""
32 """Task manager exception because of a database error"""
36 """Convert a tuple from a database query into a dictonary."""
38 for idx, col
in enumerate(cursor.description):
44 """Get the key for which dictonary d has an entry with value v.
45 Returns 'Undefined' if there's no such key, if several values are found."""
46 l = [k
for k
in d.keys()
if d[k]==v]
54 """Returns 'ok', 'warn' or 'bad' depending on the value of status."""
60 if status
in statusMap:
61 return statusMap[status]
69 if v
not in s.split():
74 def getFullTaskNames(taskman,dsname,taskname,requireSingleTask=False,confirmWithUser=False,addWildCards=True):
75 """Retrieve the full dataset and task names given a pair of (dsname,task) that may
76 contain wildcards or be just a parital name such as the run number. Depending
77 on the requireSingleTask and confirmWithUser settings a TaskManagerCheckError
78 is raised if there are multiple tasks or if the user doesn't confirm."""
79 taskList = taskman.getTaskNames(dsname,taskname,addWildCards)
81 raise TaskManagerCheckError (
'ERROR: No tasks found for dataset = %s, task = %s' % (dsname,taskname))
82 if requireSingleTask
and len(taskList)!=1:
83 m =
"ERROR: Multiple data set names found for dataset = %s, task = %s\n Please use full dataset or task name from list below, using option -n if necessary:\n\n" % (dsname,taskname)
84 m +=
" %-50s %s\n" % (
'DATASET NAME',
'TASK NAME')
85 m +=
" %s\n" % (75*
'-')
87 m +=
" %-50s %s\n" % (t[0],t[1])
89 raise TaskManagerCheckError (m)
91 print (
'Please confirm that you want to execute this command for the following tasks:\n')
92 print (
" %-50s %s" % (
'DATASET NAME',
'TASK NAME'))
93 print (
" %s" % (75*
'-'))
95 print (
" %-50s %s" % (t[0],t[1]))
96 a =
input(
'\nARE YOU SURE [n] ? ')
98 raise TaskManagerCheckError (
'ERROR: Aborted by user')
104 """Read config dict from job files."""
106 configFile = glob.glob(
'%s/%s/%s/%s/%s' % (jobDir,dsName,taskName,jobName,
'*.config.py.final.py'))
108 configFile = glob.glob(
'%s/%s/%s/%s/%s' % (jobDir,dsName,taskName,jobName,
'*.config.py'))
111 return config[
'jobConfig']
115 """A utility class to hold a single parameter to be used in SQL queries."""
119 return '%r' % self.
value
123 """TaskManager is tool for keeping track of JobRunner jobs."""
125 StatusCodes = {
'UNKNOWN': -1,
133 'POSTPROCRUNNING': 6,
138 OnDiskCodes = {
'UNKNOWN': -1,
148 connstring = os.environ.get(
'TASKDB',
'sqlite_file:taskdata.db')
151 dbtype, dbname = connstring.split(
':', 1)
153 raise ValueError (
'Illegal database connection string {}'.
format(connstring))
155 if dbtype ==
'auth_file':
159 with open(authfile)
as af:
160 connstring = af.read().strip()
161 dbtype, dbname = connstring.split(
':', 1)
163 raise ValueError (
'Invalid authorization file {} (not readable or invalid format)'.
format(authfile))
165 return dbtype, dbname
168 def __init__(self, connstring='', createDatabase=False):
169 """Constructor. connstring is a connection string specifying either a SQLite file
170 ("sqlite_file:..."), an Oracle database ("oracle://..."), or an authorization file
171 ("auth_file:...") with connection information. If connstring is empty, the connection
172 information will be taken from TASKDB (if set), or otherwise defaults to
173 'sqlite_file:taskdata.db'."""
184 dbexists = os.access(self.
dbname, os.F_OK)
185 if dbexists
and createDatabase:
186 raise ValueError (
'SQLite file {} exists already - remove before recreating'.
format(self.
dbname))
187 if not (dbexists
or createDatabase):
188 raise ValueError (
'TaskManager database not found (SQLite file {})'.
format(self.
dbname))
192 elif self.
dbtype ==
'oracle':
198 print (
'ERROR: First connection attempt to Beam Spot Oracle database failed; will retry in 10s ...')
204 raise ValueError (
'Unknown database type: {}'.
format(self.
dbtype))
207 ''' Remember that we're inside a 'with' statement so we can warn otherwise: '''
211 def __exit__(self, exc_type, exc_value, traceback):
212 ''' Close the database connection at the end of the 'with' statement. '''
216 print (
'ERROR: Unable to close database connection')
219 """Create the database schema for a SQLite3 database."""
222 TASKID integer primary key autoincrement,
227 TASKPOSTPROCSTEPS text,
237 NRESULTFILES integer default 0,
238 NJOBS integer default 0,
239 NJOBS_SUBMITTED integer default -1,
240 NJOBS_RUNNING integer default -1,
241 NJOBS_POSTPROC integer default -1,
242 NJOBS_FAILED integer default -1,
243 NJOBS_COMPLETED integer default -1,
247 STATUS_POSTPROC integer,
256 """Create the database schema for an Oracle database."""
261 except Exception
as e:
266 DSNAME varchar2(256),
268 TASKNAME varchar2(256),
269 TEMPLATE varchar2(256),
270 TASKPOSTPROCSTEPS varchar2(256),
271 ATLREL varchar2(256),
273 CREATED_USER varchar2(256),
274 CREATED_HOST varchar2(256),
278 LOGFILES varchar2(256),
279 COOLTAGS varchar2(256),
280 NRESULTFILES number(10,0) default 0,
281 NJOBS number(10,0) default 0,
282 NJOBS_SUBMITTED number(10,0) default -1,
283 NJOBS_RUNNING number(10,0) default -1,
284 NJOBS_POSTPROC number(10,0) default -1,
285 NJOBS_FAILED number(10,0) default -1,
286 NJOBS_COMPLETED number(10,0) default -1,
287 RESULTFILES varchar2(4000),
288 RESULTLINKS varchar2(4000),
289 TASKCOMMENT varchar2(4000),
290 STATUS_POSTPROC number(5,0) default -1,
291 BASEPATH varchar2(256),
292 AUXDATA varchar2(4000),
293 constraint BEAMSPOT_TASKS_PK primary key (TASKID)
298 self.
dbcon.
cursor().
execute(
'alter table TASKS add constraint TASKS_TNAME_DSNAME_UQ unique (TASKNAME, DSNAME)')
299 self.
dbcon.
cursor().
execute(
'create sequence TASKS_SEQ start with 1 increment by 1 nomaxvalue')
303 create or replace trigger TASKS_ID_TRIGGER
304 before insert on TASKS
307 select TASKS_SEQ.nextval into :new.TASKID from dual;
313 def execute(self,statementParts, doCommit=False, limit=None):
314 """Execute a SQL statement passed as a list or tuple of statement parts, where each
315 part is either a partial SQL string, or an object of type DbParam specifying a parameter
316 for the SQL statement. The actual SQL statement is assembled (using the parameter style
317 of the current database) and executed, and the resulting cursor is returned.
318 Loosely follows the method discussed in the Python Cookbook.
319 WARNING: At present, limit doesn't work when ordering rows for Oracle!"""
321 print (
'**WARN** TaskManager will keep the database connection open until it is deleted.')
322 print (
'**INFO** TaskManager should generally only be used inside a with statement:')
323 print (
'**INFO** with TaskManager(...) as taskman:')
324 print (
'**INFO** # do something ...')
326 if not statementParts:
328 if isinstance(statementParts,str):
329 raise TypeError (
'Must pass list or tuple to TaskManager.execute')
330 for p
in statementParts:
331 if not (isinstance(p,DbParam)
or isinstance(p,six.string_types)):
332 raise ValueError (
'Can only pass SQL string fragments and DbParam objects in list'
333 'to TaskManager.execute, found %s with "%s"' % (
type(p),
str(p)))
340 for p
in statementParts:
341 if isinstance(p,DbParam):
343 params.append(p.value)
350 for p
in statementParts:
351 if isinstance(p,DbParam):
352 name =
'p%d' % len(params)
353 sqlParts.append(
':%s' % name)
354 params[name] = p.value
359 raise ValueError (
'Unknown SQL parameter style %s' % self.
paramstyle)
361 sql =
' '.
join(sqlParts)
367 if 'order by' in sql:
368 sqlParts2 = sql.split(
'order by')
369 sql =
'%s and ROWNUM <= %i order by %s' % (sqlParts2[0],limit,sqlParts2[1])
371 sql +=
' and ROWNUM <= %i' % limit
373 sql +=
' limit %i' % limit
376 print (
'\nExecuting SQL statement: ',sql)
377 print (
' with parameters: ',params)
381 cursor.execute(sql,params)
384 except Exception
as e:
385 msg =
'\nDatabase error executing SQL statement\n %s\nusing parameters\n %s\n%s' % (sql,params,e)
386 raise TaskManagerDatabaseError (msg)
391 """Delete a task entry from the taskmanager database."""
392 q = [
'delete from TASKS where DSNAME =',
DbParam(dsName),
'and TASKNAME =',
DbParam(taskName) ]
394 return cursor.rowcount
397 def addTask(self,dsName,taskName,template,release,njobs,
398 taskpostprocsteps='',
399 status=StatusCodes['SUBMITTED'],
400 onDisk=OnDiskCodes['ALLONDISK'],
405 """Add an entry for a new task if the task doesn't exist already. If the task exists,
406 its UPDATED, NJOBS, STATUS and ONDISK fields will be updated."""
409 updateStatus = self.
getNTasks([
'where DSNAME = ',
DbParam(dsName),
'and TASKNAME=',
DbParam(taskName),
'and STATUS < %i' % TaskManager.StatusCodes[
'POSTPROCRUNNING']])
414 if task[
'TEMPLATE']!=template:
415 print (
'ERROR: Must not update task with different template: DSNAME = %s, TASKNAME = %s, templates = %s vs %s' % (dsName,taskName,task[
'TEMPLATE'],template))
417 updateStr = [
'update TASKS set UPDATED =',
DbParam(tstamp),
418 ', NJOBS = ',
DbParam(task[
'NJOBS']+njobs),
421 if release
not in task[
'ATLREL']:
422 print (
'WARNING: Updating task using different release: DSNAME = %s, TASKNAME = %s, release = = %s vs %s' % (dsName,taskName,task[
'ATLREL'],release))
423 release =
'; '.
join([task[
'ATLREL'],release])
424 updateStr += [
', ATLREL = ',
DbParam(release)]
427 updateStr += [
', STATUS = ',
DbParam(status)]
429 updateStr += [
'where DSNAME = ',
DbParam(dsName),
'and TASKNAME = ',
DbParam(taskName)]
440 createdHost = os.uname()[1]
442 self.
execute( [
'insert into TASKS (DSNAME,RUNNR,TASKNAME,TEMPLATE,TASKPOSTPROCSTEPS,ATLREL,CREATED,CREATED_USER,CREATED_HOST,UPDATED,STATUS,ONDISK,NJOBS,TASKCOMMENT) values (',
447 DbParam(taskpostprocsteps),
',',
461 """Get status of a task."""
462 cursor = self.
execute( [
'select STATUS from TASKS where DSNAME =',
DbParam(dsName),
'and TASKNAME =',
DbParam(taskName) ] )
463 return cursor.fetchone()[0]
466 def setStatus(self,dsName,taskName,status,oldStatus=None):
467 """Set the status of a task. If oldStatus is set, the task must be in the designated status."""
471 'where DSNAME =',
DbParam(dsName),
'and TASKNAME =',
DbParam(taskName),
'and STATUS =',
DbParam(oldStatus)],
True)
474 'where DSNAME =',
DbParam(dsName),
'and TASKNAME =',
DbParam(taskName)],
True)
475 return cursor.rowcount
479 """Update the onDisk-status of a task."""
482 'where DSNAME =',
DbParam(dsName),
'and TASKNAME =',
DbParam(taskName)],
True)
483 return cursor.rowcount
487 status,nResultFiles,nJobs,nJobsSubmitted,nJobsRunning,nJobsPostProc,nJobsFailed,nJobsCompleted):
488 """Update task status information including number of jobs in different states."""
490 cursor = self.
execute( [
'update TASKS set UPDATED =',
DbParam(tstamp),
492 ', NRESULTFILES =',
DbParam(nResultFiles),
494 ', NJOBS_SUBMITTED =',
DbParam(nJobsSubmitted),
495 ', NJOBS_RUNNING =',
DbParam(nJobsRunning),
496 ', NJOBS_POSTPROC =',
DbParam(nJobsPostProc),
497 ', NJOBS_FAILED =',
DbParam(nJobsFailed),
498 ', NJOBS_COMPLETED =',
DbParam(nJobsCompleted),
499 'where DSNAME =',
DbParam(dsName),
'and TASKNAME =',
DbParam(taskName)],
True)
500 return cursor.rowcount
503 def setValue(self,dsName,taskName,fieldName,value):
504 """Update a field of a given task."""
506 cursor = self.
execute( [
'update TASKS set UPDATED =',
DbParam(tstamp),
', %s =' % fieldName,
DbParam(value),
507 'where DSNAME =',
DbParam(dsName),
'and TASKNAME =',
DbParam(taskName)],
True)
508 return cursor.rowcount
512 """Get a single value from the task database. If the query results in more than one value, only
513 the first value is returned."""
514 q = [
'select %s from TASKS' % what ]
516 return self.
execute(q).fetchone()[0]
522 q = [
'select %s from TASKS' % what ]
524 return len(self.
execute(q).fetchall())
528 q = [
'select count(*) from TASKS' ]
530 return self.
execute(q).fetchone()[0]
533 """Get a single value from the task database. If the query results in more than one value, only
534 the first value is returned."""
535 q = [
'select %s from TASKS where DSNAME =' % what ,
DbParam(dsName),
'and TASKNAME =',
DbParam(taskName)]
536 return self.
execute(q).fetchone()[0]
538 def taskIter(self, what='*', qual=(
'order by UPDATED',)):
539 q = [
'select %s from TASKS' % what ]
542 return iter(cursor.fetchone,
None)
545 def taskIterDict(self, what='*', qual=(
'order by UPDATED',), limit=999999999):
552 q = [
'select %s from TASKS' % what ]
557 row = cursor.fetchone()
558 if row
is None:
break
568 q = [
'select %s from TASKS where DSNAME =' % what,
DbParam(dsname),
'and TASKNAME =',
DbParam(taskname) ]
573 q = [
'select %s from TASKS where DSNAME =' % what,
DbParam(dsname),
'and TASKNAME = ',
DbParam(taskname),
'and ROWNUM <= 1 order by UPDATED' ]
575 q = [
'select %s from TASKS where DSNAME =' % what,
DbParam(dsname),
'and TASKNAME = ',
DbParam(taskname),
'order by UPDATED limit 1' ]
577 row = cursor.fetchone()
585 """Get all DSNAMEs for a given (partial) data set name dsname."""
587 q = [
"select distinct(DSNAME) from TASKS where DSNAME like '%%%s%%' order by DSNAME" % dsname ]
588 return self.
execute(q).fetchall()
592 """Find all matching tasks and return list of full (DSNAME,TASKNAME) pairs. dsname and taskname
593 may be any partial dataset and task name, respectively."""
596 q = [
"select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' and TASKNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname,taskname) ]
598 q = [
"select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname) ]
601 q = [
"select DSNAME,TASKNAME from TASKS where DSNAME like '%s' and TASKNAME like '%s' order by DSNAME,TASKNAME" % (dsname,taskname) ]
603 q = [
"select DSNAME,TASKNAME from TASKS where DSNAME like '%s' order by DSNAME,TASKNAME" % (dsname) ]
604 return self.
execute(q).fetchall()
608 """TaskAnalyzer is a class for analyzing the files of a given task."""
614 self.
path = jobDir+
'/'+dsName+
'/'+taskName
616 self.
status = TaskManager.StatusCodes[
'UNKNOWN']
617 self.
onDisk = TaskManager.OnDiskCodes[
'UNKNOWN']
628 """Check if task directories are still on disk."""
629 onDisk = os.path.exists(self.
path)
631 self.
onDisk = TaskManager.OnDiskCodes[
'DELETED']
638 self.
nJobs = len(glob.glob(self.
path+
'/*/*.config.py'))
646 self.
status = TaskManager.StatusCodes[
'UNKNOWN']
648 self.
status = TaskManager.StatusCodes[
'CONFIGURED']
650 self.
status = TaskManager.StatusCodes[
'COMPLETED']
653 self.
status = TaskManager.StatusCodes[
'SUBMITTED']
655 self.
status = TaskManager.StatusCodes[
'RUNNING']
657 self.
status = TaskManager.StatusCodes[
'POSTPROCESSING']
660 self.
status = TaskManager.StatusCodes[
'PARTIALFAILED']
662 self.
status = TaskManager.StatusCodes[
'FAILED']
675 """JobAnalyzer is a class for analyzing the jobs of a given task."""
681 self.
path = jobDir+
'/'+dsName+
'/'+taskName
687 l = os.listdir(self.
path)
694 status = TaskManager.StatusCodes[
'UNKNOWN']
696 p = self.
path+
'/'+jobName
697 if os.path.exists(p):
698 if glob.glob(p+
'/*.config.py'):
699 status = TaskManager.StatusCodes[
'CONFIGURED']
700 if glob.glob(p+
'/*.SUBMITTED'):
701 status = TaskManager.StatusCodes[
'SUBMITTED']
709 if glob.glob(p+
'/*.RUNNING'):
710 status = TaskManager.StatusCodes[
'RUNNING']
711 if glob.glob(p+
'/*.POSTPROCESSING'):
712 status = TaskManager.StatusCodes[
'POSTPROCESSING']
713 if glob.glob(p+
'/*.COMPLETED'):
714 status = TaskManager.StatusCodes[
'COMPLETED']
716 if len(glob.glob(p+
'/*.exit.0'))==0:
717 status = TaskManager.StatusCodes[
'FAILED']
719 exitcode =
open(glob.glob(p+
'/*.exitstatus.dat')[0]).
read()
722 if len(glob.glob(p+
'/*.exit.*'))>len(glob.glob(p+
'/*.exit.0')):
723 status = TaskManager.StatusCodes[
'FAILED']
724 return (status,exitcode)
730 if __name__ ==
'__main__':