6 TaskManager is a tool for keeping track of JobRunner jobs using a
7 database. TaskManager uses the notion of a task as the primary unit for
8 bookkeeping. Each task includes one or more jobs run using the same
9 JobRunner template and typically resulting from a single invocation of
10 JobRunner through one of the makeDPD.py or runDPD.py
11 scripts. TaskManager keeps track of the status of tasks and associated
12 jobs, runs postprocessing scripts, and accumulates results for later
13 analysis accross tasks.
15 Written by Juerg Beringer (LBNL) in 2009.
17 __author__ =
'Juerg Beringer'
18 __version__ =
'TaskManager.py atlas/athena'
22 from InDetBeamSpotExample.Utils
import getRunFromName
23 from InDetBeamSpotExample.Utils
import getUserName
27 """Task manager exception class."""
30 """Task manager exception because of a database error"""
34 """Convert a tuple from a database query into a dictonary."""
36 for idx, col
in enumerate(cursor.description):
42 """Get the key for which dictonary d has an entry with value v.
43 Returns 'Undefined' if there's no such key, if several values are found."""
44 l = [k
for k
in d.keys()
if d[k]==v]
52 """Returns 'ok', 'warn' or 'bad' depending on the value of status."""
58 if status
in statusMap:
59 return statusMap[status]
67 if v
not in s.split():
72 def getFullTaskNames(taskman,dsname,taskname,requireSingleTask=False,confirmWithUser=False,addWildCards=True):
73 """Retrieve the full dataset and task names given a pair of (dsname,task) that may
74 contain wildcards or be just a parital name such as the run number. Depending
75 on the requireSingleTask and confirmWithUser settings a TaskManagerCheckError
76 is raised if there are multiple tasks or if the user doesn't confirm."""
77 taskList = taskman.getTaskNames(dsname,taskname,addWildCards)
79 raise TaskManagerCheckError (
'ERROR: No tasks found for dataset = %s, task = %s' % (dsname,taskname))
80 if requireSingleTask
and len(taskList)!=1:
81 m =
"ERROR: Multiple data set names found for dataset = %s, task = %s\n Please use full dataset or task name from list below, using option -n if necessary:\n\n" % (dsname,taskname)
82 m +=
" %-50s %s\n" % (
'DATASET NAME',
'TASK NAME')
83 m +=
" %s\n" % (75*
'-')
85 m +=
" %-50s %s\n" % (t[0],t[1])
87 raise TaskManagerCheckError (m)
89 print (
'Please confirm that you want to execute this command for the following tasks:\n')
90 print (
" %-50s %s" % (
'DATASET NAME',
'TASK NAME'))
91 print (
" %s" % (75*
'-'))
93 print (
" %-50s %s" % (t[0],t[1]))
94 a = input(
'\nARE YOU SURE [n] ? ')
96 raise TaskManagerCheckError (
'ERROR: Aborted by user')
102 """Read config dict from job files."""
104 configFile = glob.glob(
'%s/%s/%s/%s/%s' % (jobDir,dsName,taskName,jobName,
'*.config.py.final.py'))
106 configFile = glob.glob(
'%s/%s/%s/%s/%s' % (jobDir,dsName,taskName,jobName,
'*.config.py'))
109 return config[
'jobConfig']
113 """A utility class to hold a single parameter to be used in SQL queries."""
117 return '%r' % self.
value
121 """TaskManager is tool for keeping track of JobRunner jobs."""
123 StatusCodes = {
'UNKNOWN': -1,
131 'POSTPROCRUNNING': 6,
136 OnDiskCodes = {
'UNKNOWN': -1,
146 connstring = os.environ.get(
'TASKDB',
'sqlite_file:taskdata.db')
149 dbtype, dbname = connstring.split(
':', 1)
151 raise ValueError (
'Illegal database connection string {}'.
format(connstring))
153 if dbtype ==
'auth_file':
157 with open(authfile)
as af:
158 connstring = af.read().strip()
159 dbtype, dbname = connstring.split(
':', 1)
161 raise ValueError (
'Invalid authorization file {} (not readable or invalid format)'.
format(authfile))
163 return dbtype, dbname
166 def __init__(self, connstring='', createDatabase=False):
167 """Constructor. connstring is a connection string specifying either a SQLite file
168 ("sqlite_file:..."), an Oracle database ("oracle://..."), or an authorization file
169 ("auth_file:...") with connection information. If connstring is empty, the connection
170 information will be taken from TASKDB (if set), or otherwise defaults to
171 'sqlite_file:taskdata.db'."""
182 dbexists = os.access(self.
dbname, os.F_OK)
183 if dbexists
and createDatabase:
184 raise ValueError (
'SQLite file {} exists already - remove before recreating'.
format(self.
dbname))
185 if not (dbexists
or createDatabase):
186 raise ValueError (
'TaskManager database not found (SQLite file {})'.
format(self.
dbname))
190 elif self.
dbtype ==
'oracle':
196 print (
'ERROR: First connection attempt to Beam Spot Oracle database failed; will retry in 10s ...')
202 raise ValueError (
'Unknown database type: {}'.
format(self.
dbtype))
205 ''' Remember that we're inside a 'with' statement so we can warn otherwise: '''
209 def __exit__(self, exc_type, exc_value, traceback):
210 ''' Close the database connection at the end of the 'with' statement. '''
214 print (
'ERROR: Unable to close database connection')
217 """Create the database schema for a SQLite3 database."""
220 TASKID integer primary key autoincrement,
225 TASKPOSTPROCSTEPS text,
235 NRESULTFILES integer default 0,
236 NJOBS integer default 0,
237 NJOBS_SUBMITTED integer default -1,
238 NJOBS_RUNNING integer default -1,
239 NJOBS_POSTPROC integer default -1,
240 NJOBS_FAILED integer default -1,
241 NJOBS_COMPLETED integer default -1,
245 STATUS_POSTPROC integer,
254 """Create the database schema for an Oracle database."""
259 except Exception
as e:
264 DSNAME varchar2(256),
266 TASKNAME varchar2(256),
267 TEMPLATE varchar2(256),
268 TASKPOSTPROCSTEPS varchar2(256),
269 ATLREL varchar2(256),
271 CREATED_USER varchar2(256),
272 CREATED_HOST varchar2(256),
276 LOGFILES varchar2(256),
277 COOLTAGS varchar2(256),
278 NRESULTFILES number(10,0) default 0,
279 NJOBS number(10,0) default 0,
280 NJOBS_SUBMITTED number(10,0) default -1,
281 NJOBS_RUNNING number(10,0) default -1,
282 NJOBS_POSTPROC number(10,0) default -1,
283 NJOBS_FAILED number(10,0) default -1,
284 NJOBS_COMPLETED number(10,0) default -1,
285 RESULTFILES varchar2(4000),
286 RESULTLINKS varchar2(4000),
287 TASKCOMMENT varchar2(4000),
288 STATUS_POSTPROC number(5,0) default -1,
289 BASEPATH varchar2(256),
290 AUXDATA varchar2(4000),
291 constraint BEAMSPOT_TASKS_PK primary key (TASKID)
296 self.
dbcon.
cursor().
execute(
'alter table TASKS add constraint TASKS_TNAME_DSNAME_UQ unique (TASKNAME, DSNAME)')
297 self.
dbcon.
cursor().
execute(
'create sequence TASKS_SEQ start with 1 increment by 1 nomaxvalue')
301 create or replace trigger TASKS_ID_TRIGGER
302 before insert on TASKS
305 select TASKS_SEQ.nextval into :new.TASKID from dual;
311 def execute(self,statementParts, doCommit=False, limit=None):
312 """Execute a SQL statement passed as a list or tuple of statement parts, where each
313 part is either a partial SQL string, or an object of type DbParam specifying a parameter
314 for the SQL statement. The actual SQL statement is assembled (using the parameter style
315 of the current database) and executed, and the resulting cursor is returned.
316 Loosely follows the method discussed in the Python Cookbook.
317 WARNING: At present, limit doesn't work when ordering rows for Oracle!"""
319 print (
'**WARN** TaskManager will keep the database connection open until it is deleted.')
320 print (
'**INFO** TaskManager should generally only be used inside a with statement:')
321 print (
'**INFO** with TaskManager(...) as taskman:')
322 print (
'**INFO** # do something ...')
324 if not statementParts:
326 if isinstance(statementParts,str):
327 raise TypeError (
'Must pass list or tuple to TaskManager.execute')
328 for p
in statementParts:
329 if not (isinstance(p,DbParam)
or isinstance(p,str)):
330 raise ValueError (
'Can only pass SQL string fragments and DbParam objects in list'
331 'to TaskManager.execute, found %s with "%s"' % (
type(p),
str(p)))
338 for p
in statementParts:
339 if isinstance(p,DbParam):
341 params.append(p.value)
348 for p
in statementParts:
349 if isinstance(p,DbParam):
350 name =
'p%d' % len(params)
351 sqlParts.append(
':%s' % name)
352 params[name] = p.value
357 raise ValueError (
'Unknown SQL parameter style %s' % self.
paramstyle)
359 sql =
' '.
join(sqlParts)
365 if 'order by' in sql:
366 sqlParts2 = sql.split(
'order by')
367 sql =
'%s and ROWNUM <= %i order by %s' % (sqlParts2[0],limit,sqlParts2[1])
369 sql +=
' and ROWNUM <= %i' % limit
371 sql +=
' limit %i' % limit
374 print (
'\nExecuting SQL statement: ',sql)
375 print (
' with parameters: ',params)
379 cursor.execute(sql,params)
382 except Exception
as e:
383 msg =
'\nDatabase error executing SQL statement\n %s\nusing parameters\n %s\n%s' % (sql,params,e)
384 raise TaskManagerDatabaseError (msg)
389 """Delete a task entry from the taskmanager database."""
390 q = [
'delete from TASKS where DSNAME =',
DbParam(dsName),
'and TASKNAME =',
DbParam(taskName) ]
392 return cursor.rowcount
395 def addTask(self,dsName,taskName,template,release,njobs,
396 taskpostprocsteps='',
397 status=StatusCodes['SUBMITTED'],
398 onDisk=OnDiskCodes['ALLONDISK'],
403 """Add an entry for a new task if the task doesn't exist already. If the task exists,
404 its UPDATED, NJOBS, STATUS and ONDISK fields will be updated."""
407 updateStatus = self.
getNTasks([
'where DSNAME = ',
DbParam(dsName),
'and TASKNAME=',
DbParam(taskName),
'and STATUS < %i' % TaskManager.StatusCodes[
'POSTPROCRUNNING']])
412 if task[
'TEMPLATE']!=template:
413 print (
'ERROR: Must not update task with different template: DSNAME = %s, TASKNAME = %s, templates = %s vs %s' % (dsName,taskName,task[
'TEMPLATE'],template))
415 updateStr = [
'update TASKS set UPDATED =',
DbParam(tstamp),
416 ', NJOBS = ',
DbParam(task[
'NJOBS']+njobs),
419 if release
not in task[
'ATLREL']:
420 print (
'WARNING: Updating task using different release: DSNAME = %s, TASKNAME = %s, release = = %s vs %s' % (dsName,taskName,task[
'ATLREL'],release))
421 release =
'; '.
join([task[
'ATLREL'],release])
422 updateStr += [
', ATLREL = ',
DbParam(release)]
425 updateStr += [
', STATUS = ',
DbParam(status)]
427 updateStr += [
'where DSNAME = ',
DbParam(dsName),
'and TASKNAME = ',
DbParam(taskName)]
438 createdHost = os.uname()[1]
440 self.
execute( [
'insert into TASKS (DSNAME,RUNNR,TASKNAME,TEMPLATE,TASKPOSTPROCSTEPS,ATLREL,CREATED,CREATED_USER,CREATED_HOST,UPDATED,STATUS,ONDISK,NJOBS,TASKCOMMENT) values (',
445 DbParam(taskpostprocsteps),
',',
459 """Get status of a task."""
460 cursor = self.
execute( [
'select STATUS from TASKS where DSNAME =',
DbParam(dsName),
'and TASKNAME =',
DbParam(taskName) ] )
461 return cursor.fetchone()[0]
464 def setStatus(self,dsName,taskName,status,oldStatus=None):
465 """Set the status of a task. If oldStatus is set, the task must be in the designated status."""
469 'where DSNAME =',
DbParam(dsName),
'and TASKNAME =',
DbParam(taskName),
'and STATUS =',
DbParam(oldStatus)],
True)
472 'where DSNAME =',
DbParam(dsName),
'and TASKNAME =',
DbParam(taskName)],
True)
473 return cursor.rowcount
477 """Update the onDisk-status of a task."""
480 'where DSNAME =',
DbParam(dsName),
'and TASKNAME =',
DbParam(taskName)],
True)
481 return cursor.rowcount
485 status,nResultFiles,nJobs,nJobsSubmitted,nJobsRunning,nJobsPostProc,nJobsFailed,nJobsCompleted):
486 """Update task status information including number of jobs in different states."""
488 cursor = self.
execute( [
'update TASKS set UPDATED =',
DbParam(tstamp),
490 ', NRESULTFILES =',
DbParam(nResultFiles),
492 ', NJOBS_SUBMITTED =',
DbParam(nJobsSubmitted),
493 ', NJOBS_RUNNING =',
DbParam(nJobsRunning),
494 ', NJOBS_POSTPROC =',
DbParam(nJobsPostProc),
495 ', NJOBS_FAILED =',
DbParam(nJobsFailed),
496 ', NJOBS_COMPLETED =',
DbParam(nJobsCompleted),
497 'where DSNAME =',
DbParam(dsName),
'and TASKNAME =',
DbParam(taskName)],
True)
498 return cursor.rowcount
501 def setValue(self,dsName,taskName,fieldName,value):
502 """Update a field of a given task."""
504 cursor = self.
execute( [
'update TASKS set UPDATED =',
DbParam(tstamp),
', %s =' % fieldName,
DbParam(value),
505 'where DSNAME =',
DbParam(dsName),
'and TASKNAME =',
DbParam(taskName)],
True)
506 return cursor.rowcount
510 """Get a single value from the task database. If the query results in more than one value, only
511 the first value is returned."""
512 q = [
'select %s from TASKS' % what ]
514 return self.
execute(q).fetchone()[0]
520 q = [
'select %s from TASKS' % what ]
522 return len(self.
execute(q).fetchall())
526 q = [
'select count(*) from TASKS' ]
528 return self.
execute(q).fetchone()[0]
531 """Get a single value from the task database. If the query results in more than one value, only
532 the first value is returned."""
533 q = [
'select %s from TASKS where DSNAME =' % what ,
DbParam(dsName),
'and TASKNAME =',
DbParam(taskName)]
534 return self.
execute(q).fetchone()[0]
536 def taskIter(self, what='*', qual=(
'order by UPDATED',)):
537 q = [
'select %s from TASKS' % what ]
540 return iter(cursor.fetchone,
None)
543 def taskIterDict(self, what='*', qual=(
'order by UPDATED',), limit=999999999):
550 q = [
'select %s from TASKS' % what ]
555 row = cursor.fetchone()
556 if row
is None:
break
566 q = [
'select %s from TASKS where DSNAME =' % what,
DbParam(dsname),
'and TASKNAME =',
DbParam(taskname) ]
571 q = [
'select %s from TASKS where DSNAME =' % what,
DbParam(dsname),
'and TASKNAME = ',
DbParam(taskname),
'and ROWNUM <= 1 order by UPDATED' ]
573 q = [
'select %s from TASKS where DSNAME =' % what,
DbParam(dsname),
'and TASKNAME = ',
DbParam(taskname),
'order by UPDATED limit 1' ]
575 row = cursor.fetchone()
583 """Get all DSNAMEs for a given (partial) data set name dsname."""
585 q = [
"select distinct(DSNAME) from TASKS where DSNAME like '%%%s%%' order by DSNAME" % dsname ]
586 return self.
execute(q).fetchall()
590 """Find all matching tasks and return list of full (DSNAME,TASKNAME) pairs. dsname and taskname
591 may be any partial dataset and task name, respectively."""
594 q = [
"select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' and TASKNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname,taskname) ]
596 q = [
"select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname) ]
599 q = [
"select DSNAME,TASKNAME from TASKS where DSNAME like '%s' and TASKNAME like '%s' order by DSNAME,TASKNAME" % (dsname,taskname) ]
601 q = [
"select DSNAME,TASKNAME from TASKS where DSNAME like '%s' order by DSNAME,TASKNAME" % (dsname) ]
602 return self.
execute(q).fetchall()
606 """TaskAnalyzer is a class for analyzing the files of a given task."""
612 self.
path = jobDir+
'/'+dsName+
'/'+taskName
614 self.
status = TaskManager.StatusCodes[
'UNKNOWN']
615 self.
onDisk = TaskManager.OnDiskCodes[
'UNKNOWN']
626 """Check if task directories are still on disk."""
627 onDisk = os.path.exists(self.
path)
629 self.
onDisk = TaskManager.OnDiskCodes[
'DELETED']
636 self.
nJobs = len(glob.glob(self.
path+
'/*/*.config.py'))
644 self.
status = TaskManager.StatusCodes[
'UNKNOWN']
646 self.
status = TaskManager.StatusCodes[
'CONFIGURED']
648 self.
status = TaskManager.StatusCodes[
'COMPLETED']
651 self.
status = TaskManager.StatusCodes[
'SUBMITTED']
653 self.
status = TaskManager.StatusCodes[
'RUNNING']
655 self.
status = TaskManager.StatusCodes[
'POSTPROCESSING']
658 self.
status = TaskManager.StatusCodes[
'PARTIALFAILED']
660 self.
status = TaskManager.StatusCodes[
'FAILED']
673 """JobAnalyzer is a class for analyzing the jobs of a given task."""
679 self.
path = jobDir+
'/'+dsName+
'/'+taskName
685 l = os.listdir(self.
path)
692 status = TaskManager.StatusCodes[
'UNKNOWN']
694 p = self.
path+
'/'+jobName
695 if os.path.exists(p):
696 if glob.glob(p+
'/*.config.py'):
697 status = TaskManager.StatusCodes[
'CONFIGURED']
698 if glob.glob(p+
'/*.SUBMITTED'):
699 status = TaskManager.StatusCodes[
'SUBMITTED']
707 if glob.glob(p+
'/*.RUNNING'):
708 status = TaskManager.StatusCodes[
'RUNNING']
709 if glob.glob(p+
'/*.POSTPROCESSING'):
710 status = TaskManager.StatusCodes[
'POSTPROCESSING']
711 if glob.glob(p+
'/*.COMPLETED'):
712 status = TaskManager.StatusCodes[
'COMPLETED']
714 if len(glob.glob(p+
'/*.exit.0'))==0:
715 status = TaskManager.StatusCodes[
'FAILED']
717 exitcode =
open(glob.glob(p+
'/*.exitstatus.dat')[0]).
read()
720 if len(glob.glob(p+
'/*.exit.*'))>len(glob.glob(p+
'/*.exit.0')):
721 status = TaskManager.StatusCodes[
'FAILED']
722 return (status,exitcode)
728 if __name__ ==
'__main__':