7 taskman is a command line utility to run TaskManager functions.
9 __author__ =
'Juerg Beringer'
10 __version__ =
'taskman.py atlas/athena'
11 __usage__ =
'''%prog [options] taskdbconn command [args ...]
15 init Create a new database
16 checkdb Check (and fix) sqlite database
17 checkdup Check task database for duplicate entries
18 debug Interactive debugging (EXPERTS ONLY!)
19 dump [DSNAME TASKNAME] Dump all or selected tasks in database
20 show DSNAME [TASKNAME] Show list of tasks
21 update Update information of ongoing tasks
22 update DSNAME TASKNAME Update information of selected tasks
23 rebuild [DSNAME TASKNAME] Rebuild missing or selected database entries from job files
24 import [DSNAME TASKNAME] DBCONN Import all or selected tasks from database DBCONN (duplicates not checked)
25 setstatus DSNAME TASKNAME STATUS Set status of selected tasks to STATUS
26 setfield DSNAME TASKNAME FIELDNAME VALUE Set database field to value (experts only!)
27 delete DSNAME TASKNAME Delete entry for selected task(s) (task files are NOT removed)
28 deleteResults DSNAME TASKNAME Delete postprocessing results for selected set of tasks
29 deleteTask DSNAME TASKNAME Delete task files and corresponding TaskManager entry
30 for a single task (will prompt for confirmation unless
31 option --batch is used)
37 from InDetBeamSpotExample.Utils
import getRunFromName
42 from optparse
import OptionParser
43 parser = OptionParser(usage=__usage__, version=__version__)
45 parser.add_option(
'-b',
'--batch', dest=
'batch', action=
'store_true', default=
False, help=
'batch mode - never ask for confirmation')
46 parser.add_option(
'-v',
'--verbose', dest=
'verbose', action=
'store_true', default=
False, help=
'enable more verbose output for some commands')
47 parser.add_option(
'-n',
'--nowildcards', dest=
'nowildcards', action=
'store_true', default=
False, help=
'do not add wildcards when looking up dataset and task names')
48 parser.add_option(
'-d',
'--dbconn', dest=
'dbconn', default=
'', help=
'task manager database connection string (default: check TASKDB, otherwise use sqlite_file:taskdata.db)')
49 parser.add_option(
'',
'--proddir', dest=
'proddir', default=
'.', help=
'production directory (default: "."')
50 parser.add_option(
'-p',
'--pretty', dest=
'pretty', action=
'store_true', default=
None, help=
'try to nicely format output (default: auto)')
51 parser.add_option(
'',
'--no-pretty', dest=
'pretty', action=
'store_false', help=
'do not attempt to format output')
52 parser.add_option(
'',
'--runtaskname', dest=
'runtaskname', default=
'CB_BEAMSPOT', help=
'task name')
53 (options,args) = parser.parse_args()
56 parser.error(
'wrong number of command line arguments')
60 proddir = options.proddir
61 if not os.path.exists(proddir):
62 sys.exit(
'ERROR: Job directory %s does not exist or is unreadable' % proddir)
65 ''' Open task manager '''
66 dbconn = dbconn
or options.dbconn
68 return TaskManager(dbconn)
70 print (
'ERROR: Unable to access task manager database %s' % dbconn)
77 if cmdargs: parser.error(
'Command init does not take arguments')
79 dbtype, dbname = TaskManager.parseConnectionInfo(options.dbconn)
80 except ValueError
as e:
81 sys.exit(
'ERROR: {}'.
format(e))
82 print (
'Will initialise schema for database: {}:{}'.
format(dbtype, dbname))
85 print (
'Checking for existing database ...')
87 with TaskManager(options.dbconn):
pass
88 except ValueError
as e:
89 print (
'Test connection failed: {}'.
format(e))
91 print (
'Test connection succeeded: the database already exists!')
94 a = raw_input(
'\nRECREATING TASK DATABASE SCHEMA - ANY EXISTING DATA WILL BE ERASED!\n\nARE YOU REALLY ABSOLUTELY SURE [n] ? ')
95 if a !=
'y': sys.exit(
'ERROR: Rebuilding aborted by user')
97 with TaskManager(options.dbconn, createDatabase=
True):
pass
105 if cmdargs: parser.error(
'Command checkdb does not take arguments')
107 dbtype, dbfile = TaskManager.parseConnectionInfo(options.dbconn)
109 sys.exit(
"ERROR: Illegal or empty/default database connection string - must provide explicity SQLite file reference")
110 if dbtype !=
'sqlite_file':
111 sys.exit(
'ERROR: checkdb is only supported for SQLite databases')
112 if not os.path.exists(dbfile):
113 sys.exit(
'ERROR: SQLite file %s does not exist' % dbfile)
115 (status, output) = subprocess.getstatusoutput(
"sqlite3 %s 'pragma integrity_check;'" % dbfile)
117 sys.exit(
'ERROR: Error executing sqlite3 command')
119 print (
'ERROR: SQLite database file has errors:')
123 if not options.batch:
124 a = raw_input(
'Do you want to try VACUUM [n] ? ')
126 sys.exit(
'\nERROR: VACUUM not executed - please fix database file manually')
127 (status, output) = subprocess.getstatusoutput(
"sqlite3 %s 'vacuum;'" % dbfile)
130 sys.exit(
'ERROR: VACUUM failed')
132 (status, output) = subprocess.getstatusoutput(
"sqlite3 %s 'pragma integrity_check;'" % dbfile)
133 if status != 0
or output !=
'ok':
135 sys.exit(
'ERROR: Integrity check still failed - please check')
136 print (
'INFO: SQLite file now passes integrity test (content may still have errors)')
139 print (
'INFO: SQLite file {} ok'.
format(dbfile))
146 if cmd ==
'checkdup':
147 if cmdargs: parser.error(
'Command checkdup does not take arguments')
151 for t
in taskman.taskIter(
'dsname,taskname,count(*)', [
'group by dsname,taskname']):
154 print (
'Duplicate task: {} / {}'.
format(t[0], t[1]))
155 for d
in taskman.taskIterDict(qual=[
156 'where DSNAME =', DbParam(t[0]),
157 'and TASKNAME =', DbParam(t[1]),
159 print (
' TASKID =', d[
'TASKID'])
160 print (nDuplicates,
'duplicates found')
170 print (taskman.getNTasks(),
'task(s) found:\n')
172 for t
in taskman.taskIterDict():
173 print (
'\n\nTask {} / {}:\n'.
format(
176 print (pprint.pformat(t))
178 for t
in taskman.taskIter():
182 elif len(cmdargs) == 2:
184 taskname = cmdargs[1]
189 addWildCards=
not options.nowildcards)
190 except TaskManagerCheckError
as e:
193 taskEntry = taskman.getTaskDict(t[0], t[1])
194 if options.pretty
is None or options.pretty:
195 print (
'\n\nTask {} / {}:\n'.
format(
197 taskEntry[
'TASKNAME']))
198 print (pprint.pformat(taskEntry))
203 else: parser.error(
'Command dump takes either 0 or 2 arguments')
209 if cmd==
'show' and (len(args)==2
or len(args)==3):
211 taskname = args[2]
if len(args)>2
else ''
214 taskList =
getFullTaskNames(taskman,dsname,taskname,addWildCards=
not options.nowildcards)
216 print (
" %-50s %s" % (
'DATASET NAME',
'TASK NAME'))
217 print (
" %s" % (75*
'-'))
219 print (
" %-50s %s" % (t[0],t[1]) )
220 print (
'\n%i task(s) found\n' % len(taskList))
222 except TaskManagerCheckError
as e:
230 if cmd==
'update' and len(args)==1:
231 if cmdargs: parser.error(
'Command update does not take any arguments')
234 for t
in taskman.taskIterDict(
'DSNAME,TASKNAME', [
235 'where STATUS < {:d} and ONDISK < {:d}'.
format(
236 TaskManager.StatusCodes[
'POSTPROCESSING'],
237 TaskManager.OnDiskCodes[
'ARCHIVED'])]):
238 a = TaskAnalyzer(proddir, t[
'DSNAME'], t[
'TASKNAME'])
240 print (
'Updating task {}/{}'.
format(t[
'DSNAME'], t[
'TASKNAME']))
241 a.updateStatus(taskman)
243 taskman.setDiskStatus(t[
'DSNAME'], t[
'TASKNAME'],
244 TaskManager.OnDiskCodes[
'DELETED'])
247 if cmd==
'update' and len(args)==3:
250 taskList =
getFullTaskNames(taskman,args[1],args[2],confirmWithUser=
not options.batch,addWildCards=
not options.nowildcards)
251 except TaskManagerCheckError
as e:
255 a = TaskAnalyzer(proddir,t[0],t[1])
257 print (
'Updating task %s/%s' % (t[0],t[0]))
258 a.updateStatus(taskman)
260 taskman.setDiskStatus(t[0],t[0],TaskManager.OnDiskCodes[
'DELETED'])
268 if len(cmdargs) == 2:
270 taskname = cmdargs[1]
271 taskpath = os.path.join(proddir, dsname, taskname)
273 taskpath = os.path.join(proddir,
'*',
'*')
275 parser.error(
'Command rebuild takes 0 or 2 arguments')
277 print (
'Will reconstruct database from directory: {}'.
format(taskpath))
282 if not options.batch:
283 print (
'Rebuilding the following tasks:')
285 print (
' DB {:50} {}'.
format(
'DATASET NAME',
'TASK NAME'))
286 print (
' {}'.
format(85 *
'-'))
287 for p
in glob.glob(taskpath):
288 if not os.path.isdir(p):
continue
289 (dsName, taskName) = p.split(
'/')[-2:]
290 nDefined = taskman.getNTasks([
291 'where DSNAME=', DbParam(dsName),
292 'and TASKNAME=', DbParam(taskName)])
293 dirs.append((dsName, taskName, nDefined))
296 print (
' {:50} {}'.
format(dsName, taskName))
297 elif options.verbose:
298 print (
' * {:50} {}'.
format(dsName, taskName))
300 a = raw_input(
'\nARE YOU SURE [n] ? ')
302 sys.exit(
'ERROR: Rebuilding aborted by user')
305 for (dsName, taskName, nDefined)
in dirs:
307 print (
'Adding missing entry for task {}/{}'.
format(dsName, taskName))
308 configFile = glob.glob(os.path.join(proddir, dsName, taskName,
'*',
'*.config.py'))
310 mtime = os.stat(configFile[0])[stat.ST_MTIME]
314 template = config[
'jobConfig'][
'joboptionpath']
318 release = config[
'jobConfig'][
'release']
322 taskpostprocsteps = config[
'jobConfig'][
'taskpostprocsteps']
324 taskpostprocsteps =
''
326 comment = config[
'jobConfig'][
'comment'] +
' (rebuilt from config file)'
328 comment =
'rebuilt from config file'
329 taskman.addTask(dsName, taskName, template, release, 0, taskpostprocsteps,
331 createdUser=
'UNKNOWN',
332 createdHost=
'UNKNOWN',
335 print (
'WARNING: no config file found, unable to determine task creation time and configuration details')
337 taskman.addTask(dsName, taskName,
'UNKNOWN',
'UNKNOWN', 0,
338 comment=
'rebuilt, no config file found')
339 a = TaskAnalyzer(proddir, dsName, taskName)
341 a.updateStatus(taskman)
343 print (
'Entry exists already for task {}/{} - SKIPPED'.
format(dsName, taskName))
345 print (
'ERROR: {:d} conflicting task entries for task {}/{} - SKIPPED'.
format(nDefined, dsName, taskName))
353 if len(cmdargs) == 1:
354 fromDbconn = cmdargs[0]
356 elif len(cmdargs) == 3:
358 taskname = cmdargs[1]
359 fromDbconn = cmdargs[2]
363 wildcards = string.maketrans(
'*?',
'%_')
365 if '*' in dsname
or '?' in dsname:
366 qual.append(
"where DSNAME like '{}'".
format(dsname.translate(wildcards)))
368 qual.extend([
'where DSNAME =', DbParam(dsname)])
369 if '*' in taskname
or '?' in taskname:
370 qual.append(
"and TASKNAME like '{}'".
format(taskname.translate(wildcards)))
372 qual.extend([
'and TASKNAME =', DbParam(taskname)])
374 parser.error(
'Command import takes 1 or 3 arguments')
377 destDbtype, destDbname = TaskManager.parseConnectionInfo(options.dbconn)
378 fromDbtype, fromDbname = TaskManager.parseConnectionInfo(fromDbconn)
379 except ValueError
as e:
380 sys.exit(
'ERROR: Bad database connection string {}'.
format(e))
382 print (
'Import from: {}:{}'.
format(fromDbtype, fromDbname))
383 print (
'Import into: {}:{}'.
format(destDbtype, destDbname))
389 qual.append(
'order by RUNNR desc')
390 for t
in fromman.taskIterDict(qual=qual):
391 print (
'Importing ', t[
'DSNAME'],
'/' , t[
'TASKNAME'],
'...')
392 if not 'RUNNR' in t.keys():
395 if not t.get(
'RESULTLINKS'):
398 files = t.get(
'RESULTFILES',
'')
402 taskname = t[
'TASKNAME']
403 summaryFiles = glob.glob(
'%s/%s/*beamspot.gif' % (dsname,taskname))
404 if len(summaryFiles)>0:
405 for r
in summaryFiles:
408 if not f
in files.split():
409 files =
' '.
join([files,f])
410 if not pdf
in files.split():
411 files =
' '.
join([files,pdf])
412 links +=
' <a href="../files?u=%s/%s/%s">summary</a>' %(dsname,taskname,f)
413 links +=
' (<a href="/jobfiles/%s/%s/%s">pdf</a>)' %(dsname,taskname,pdf)
414 monitoringFiles = glob.glob(
'%s/%s/*beamspotmon.gif' % (dsname,taskname))
415 if len(monitoringFiles)>0:
416 for r
in monitoringFiles:
419 if not f
in files.split():
420 files =
' '.
join([files,f])
421 if not pdf
in files.split():
422 files =
' '.
join([files,pdf])
423 links +=
' <a href="../files?u=%s/%s/%s">monitoring</a>' %(dsname,taskname,f)
424 links +=
' (<a href="/jobfiles/%s/%s/%s">pdf</a>)' %(dsname,taskname,pdf)
425 t[
'RESULTFILES'] = files
426 t[
'RESULTLINKS'] = links
430 for fieldName
in t.keys():
431 newFieldName = fieldName
432 if fieldName ==
'TASKID':
434 elif fieldName ==
'COMMENT':
435 newFieldName =
'TASKCOMMENT'
437 fieldNameString +=
','
439 fieldNameString += newFieldName
440 params.append(DbParam(t[fieldName]))
441 q = [
'insert into TASKS (%s) values (' % fieldNameString ]
444 taskman.execute(q,
True)
447 print (
'{:d} tasks imported'.
format(nImported))
454 if cmd ==
'setstatus':
455 if len(cmdargs) != 3: parser.error(
'Command setstatus takes 3 arguments')
457 taskname = cmdargs[1]
460 status = TaskManager.StatusCodes.get(statusName)
462 print (
'Setting task status to {} (code {:d})'.
format(statusName,status))
464 sys.exit(
'ERROR: Illegal status code name {}'.
format(statusName))
470 confirmWithUser=
not options.batch,
471 addWildCards=
not options.nowildcards)
472 except TaskManagerCheckError
as e:
477 n += taskman.setValue(t[0], t[1],
'STATUS', status)
478 print (
'%i task status entries updated\n' % (n))
485 if cmd ==
'setfield':
486 if len(cmdargs) != 4: parser.error(
'Command setfield takes 4 arguments')
488 taskname = cmdargs[1]
490 fieldValue = cmdargs[3]
492 print (
'Setting field %s to: %s' % (fieldName,fieldValue))
497 requireSingleTask=
True,
498 confirmWithUser=
not options.batch,
499 addWildCards=
not options.nowildcards)
500 except TaskManagerCheckError
as e:
502 n = taskman.setValue(dsname,task,fieldName,fieldValue)
503 print (
'{:d} task status entries updated'.
format(n))
510 if cmd==
'delete' and len(args)==3:
513 taskList =
getFullTaskNames(taskman,args[1],args[2],confirmWithUser=
not options.batch,addWildCards=
not options.nowildcards)
514 except TaskManagerCheckError
as e:
519 n += taskman.deleteTask(t[0],t[1])
520 print (
'\n%i task entries deleted\n' % (n))
525 if cmd==
'listResults' and len(args)==3:
528 taskList =
getFullTaskNames(taskman,args[1],args[2],confirmWithUser=
not options.batch,addWildCards=
not options.nowildcards)
529 except TaskManagerCheckError
as e:
533 print (
'Listing results for task %s/%s ...' % (t[0],t[1]) )
534 print (taskman.getTaskValue(t[0],t[1],
'RESULTFILES'))
535 print (taskman.getTaskValue(t[0],t[1],
'RESULTLINKS'))
543 if cmd==
'deleteResults' and len(args)==3:
546 taskList =
getFullTaskNames(taskman,args[1],args[2],confirmWithUser=
not options.batch,addWildCards=
not options.nowildcards)
547 except TaskManagerCheckError
as e:
551 print (
'Deleting results for task %s/%s ...' % (t[0],t[1]) )
552 taskman.setValue(t[0],t[1],
'RESULTFILES',
'')
553 taskman.setValue(t[0],t[1],
'RESULTLINKS',
'')
560 if cmd==
'deleteTask' and len(args)==3:
563 [(dsname,task)] =
getFullTaskNames(taskman,args[1],args[2],confirmWithUser=
not options.batch,requireSingleTask=
True,addWildCards=
not options.nowildcards)
564 except TaskManagerCheckError
as e:
567 dir =
'/'.
join([proddir,dsname,task])
569 if os.path.exists(dir):
570 os.system(
'du -hs %s' % dir)
572 print (
'No task files found (no directory %s)' % dir)
573 if not options.batch:
574 a = input(
'\nDeleteing task entry and all files - ARE YOU SURE [n] ? ')
576 print (
"ERROR: Deletion aborted by user.")
578 n = taskman.deleteTask(dsname,task)
579 print (
'\n%i task entry deleted.' % (n))
580 print (
'Deleting ',dir,
' ...')
581 os.system(
'rm -rf %s' % dir)
584 if cmd==
'deleteTask' and len(args)==2:
589 taskList =
getFullTaskNames(taskman,dsname,taskname,addWildCards=
not options.nowildcards)
590 except TaskManagerCheckError
as e:
598 print (
'\n%i task(s) found\n' % len(taskList))
602 print (
" %-50s %s" % (dsname,taskname) )
603 dir =
'/'.
join([proddir,dsname,taskname])
605 if os.path.exists(dir):
606 os.system(
'du -hs %s' % dir)
608 print (
'No task files found (no directory %s)' % dir)
609 if not options.batch:
610 a = input(
'\nDeleteing task entry and all files - ARE YOU SURE [n] ? ')
612 print (
"ERROR: Deletion aborted by user.")
614 n += taskman.deleteTask(dsname,taskname)
615 print (
'Deleting ',dir,
' ...')
616 os.system(
'rm -rf %s' % dir)
617 print (
'\n%i tasks deleted.' % (n))
620 if cmd==
'notifyFailed' and len(args)<3:
622 earliestUpdateTime = time.time()-
float(args[1])
624 earliestUpdateTime = 0
628 for t
in taskman.taskIterDict(
'*',[
"where TASKNAME like '%%%s%%'" % options.runtaskname,
629 "and UPDATED >= ", DbParam(earliestUpdateTime),
630 "and (STATUS = %i or STATUS = %i)" %(TaskManager.StatusCodes[
'FAILED'], TaskManager.StatusCodes[
'POSTPROCFAILED']),
631 "order by RUNNR desc"]):
634 t[
'STATUS'] = [k
for k, v
in TaskManager.StatusCodes.items()
if v == t[
'STATUS']][0]
639 dsWidth = len(
max([t[
'DSNAME']
for t
in taskList],key=len))
640 taskWidth = len(
max([t[
'TASKNAME']
for t
in taskList],key=len))
641 statusWidth = len(
max([t[
'STATUS']
for t
in taskList],key=len))
643 bodyFormat =
'%%(DSNAME)%ss %%(TASKNAME)%ss %%(STATUS)%ss' %(dsWidth, taskWidth, statusWidth)
645 mailBody =
'The following %s tasks have reported failures' % len(taskList)
648 mailBody +=
' in the last %s hours' % hours
650 mailBody +=
'\n'.
join([bodyFormat % t
for t
in taskList])
651 stat = os.system(
'source /afs/cern.ch/user/a/atlidbs/cron/mailwrapper; mailwrapper "[atlidbs] %s tasks report failures" "%s"' %(len(taskList), mailBody)) >> 8
654 print (
'\nERROR: Unable to send mail\n')
664 if cmd==
'debug' and len(args)==1:
667 os.environ[
'PYTHONINSPECT'] =
'1'
669 print (
'Entering python shell. taskman is the TaskManager object. Be careful!')
672 print (
'ERROR: Illegal command or number of arguments')