7 taskman is a command line utility to run TaskManager functions.
9 __author__ =
'Juerg Beringer'
10 __version__ =
'taskman.py atlas/athena'
11 __usage__ =
'''%prog [options] taskdbconn command [args ...]
15 init Create a new database
16 checkdb Check (and fix) sqlite database
17 checkdup Check task database for duplicate entries
18 debug Interactive debugging (EXPERTS ONLY!)
19 dump [DSNAME TASKNAME] Dump all or selected tasks in database
20 show DSNAME [TASKNAME] Show list of tasks
21 update Update information of ongoing tasks
22 update DSNAME TASKNAME Update information of selected tasks
23 rebuild [DSNAME TASKNAME] Rebuild missing or selected database entries from job files
24 import [DSNAME TASKNAME] DBCONN Import all or selected tasks from database DBCONN (duplicates not checked)
25 setstatus DSNAME TASKNAME STATUS Set status of selected tasks to STATUS
26 setfield DSNAME TASKNAME FIELDNAME VALUE Set database field to value (experts only!)
27 delete DSNAME TASKNAME Delete entry for selected task(s) (task files are NOT removed)
28 deleteResults DSNAME TASKNAME Delete postprocessing results for selected set of tasks
29 deleteTask DSNAME TASKNAME Delete task files and corresponding TaskManager entry
30 for a single task (will prompt for confirmation unless
31 option --batch is used)
37 from InDetBeamSpotExample.Utils
import getRunFromName
39 from future
import standard_library
40 standard_library.install_aliases()
44 from optparse
import OptionParser
45 parser = OptionParser(usage=__usage__, version=__version__)
47 parser.add_option(
'-b',
'--batch', dest=
'batch', action=
'store_true', default=
False, help=
'batch mode - never ask for confirmation')
48 parser.add_option(
'-v',
'--verbose', dest=
'verbose', action=
'store_true', default=
False, help=
'enable more verbose output for some commands')
49 parser.add_option(
'-n',
'--nowildcards', dest=
'nowildcards', action=
'store_true', default=
False, help=
'do not add wildcards when looking up dataset and task names')
50 parser.add_option(
'-d',
'--dbconn', dest=
'dbconn', default=
'', help=
'task manager database connection string (default: check TASKDB, otherwise use sqlite_file:taskdata.db)')
51 parser.add_option(
'',
'--proddir', dest=
'proddir', default=
'.', help=
'production directory (default: "."')
52 parser.add_option(
'-p',
'--pretty', dest=
'pretty', action=
'store_true', default=
None, help=
'try to nicely format output (default: auto)')
53 parser.add_option(
'',
'--no-pretty', dest=
'pretty', action=
'store_false', help=
'do not attempt to format output')
54 parser.add_option(
'',
'--runtaskname', dest=
'runtaskname', default=
'CB_BEAMSPOT', help=
'task name')
55 (options,args) = parser.parse_args()
58 parser.error(
'wrong number of command line arguments')
62 proddir = options.proddir
63 if not os.path.exists(proddir):
64 sys.exit(
'ERROR: Job directory %s does not exist or is unreadable' % proddir)
67 ''' Open task manager '''
68 dbconn = dbconn
or options.dbconn
70 return TaskManager(dbconn)
72 print (
'ERROR: Unable to access task manager database %s' % dbconn)
79 if cmdargs: parser.error(
'Command init does not take arguments')
81 dbtype, dbname = TaskManager.parseConnectionInfo(options.dbconn)
82 except ValueError
as e:
83 sys.exit(
'ERROR: {}'.
format(e))
84 print (
'Will initialise schema for database: {}:{}'.
format(dbtype, dbname))
87 print (
'Checking for existing database ...')
89 with TaskManager(options.dbconn):
pass
90 except ValueError
as e:
91 print (
'Test connection failed: {}'.
format(e))
93 print (
'Test connection succeeded: the database already exists!')
96 a = raw_input(
'\nRECREATING TASK DATABASE SCHEMA - ANY EXISTING DATA WILL BE ERASED!\n\nARE YOU REALLY ABSOLUTELY SURE [n] ? ')
97 if a !=
'y': sys.exit(
'ERROR: Rebuilding aborted by user')
99 with TaskManager(options.dbconn, createDatabase=
True):
pass
107 if cmdargs: parser.error(
'Command checkdb does not take arguments')
109 dbtype, dbfile = TaskManager.parseConnectionInfo(options.dbconn)
111 sys.exit(
"ERROR: Illegal or empty/default database connection string - must provide explicity SQLite file reference")
112 if dbtype !=
'sqlite_file':
113 sys.exit(
'ERROR: checkdb is only supported for SQLite databases')
114 if not os.path.exists(dbfile):
115 sys.exit(
'ERROR: SQLite file %s does not exist' % dbfile)
117 (status, output) = subprocess.getstatusoutput(
"sqlite3 %s 'pragma integrity_check;'" % dbfile)
119 sys.exit(
'ERROR: Error executing sqlite3 command')
121 print (
'ERROR: SQLite database file has errors:')
125 if not options.batch:
126 a = raw_input(
'Do you want to try VACUUM [n] ? ')
128 sys.exit(
'\nERROR: VACUUM not executed - please fix database file manually')
129 (status, output) = subprocess.getstatusoutput(
"sqlite3 %s 'vacuum;'" % dbfile)
132 sys.exit(
'ERROR: VACUUM failed')
134 (status, output) = subprocess.getstatusoutput(
"sqlite3 %s 'pragma integrity_check;'" % dbfile)
135 if status != 0
or output !=
'ok':
137 sys.exit(
'ERROR: Integrity check still failed - please check')
138 print (
'INFO: SQLite file now passes integrity test (content may still have errors)')
141 print (
'INFO: SQLite file {} ok'.
format(dbfile))
148 if cmd ==
'checkdup':
149 if cmdargs: parser.error(
'Command checkdup does not take arguments')
153 for t
in taskman.taskIter(
'dsname,taskname,count(*)', [
'group by dsname,taskname']):
156 print (
'Duplicate task: {} / {}'.
format(t[0], t[1]))
157 for d
in taskman.taskIterDict(qual=[
158 'where DSNAME =', DbParam(t[0]),
159 'and TASKNAME =', DbParam(t[1]),
161 print (
' TASKID =', d[
'TASKID'])
162 print (nDuplicates,
'duplicates found')
172 print (taskman.getNTasks(),
'task(s) found:\n')
174 for t
in taskman.taskIterDict():
175 print (
'\n\nTask {} / {}:\n'.
format(
178 print (pprint.pformat(t))
180 for t
in taskman.taskIter():
184 elif len(cmdargs) == 2:
186 taskname = cmdargs[1]
191 addWildCards=
not options.nowildcards)
192 except TaskManagerCheckError
as e:
195 taskEntry = taskman.getTaskDict(t[0], t[1])
196 if options.pretty
is None or options.pretty:
197 print (
'\n\nTask {} / {}:\n'.
format(
199 taskEntry[
'TASKNAME']))
200 print (pprint.pformat(taskEntry))
205 else: parser.error(
'Command dump takes either 0 or 2 arguments')
211 if cmd==
'show' and (len(args)==2
or len(args)==3):
213 taskname = args[2]
if len(args)>2
else ''
216 taskList =
getFullTaskNames(taskman,dsname,taskname,addWildCards=
not options.nowildcards)
218 print (
" %-50s %s" % (
'DATASET NAME',
'TASK NAME'))
219 print (
" %s" % (75*
'-'))
221 print (
" %-50s %s" % (t[0],t[1]) )
222 print (
'\n%i task(s) found\n' % len(taskList))
224 except TaskManagerCheckError
as e:
232 if cmd==
'update' and len(args)==1:
233 if cmdargs: parser.error(
'Command update does not take any arguments')
236 for t
in taskman.taskIterDict(
'DSNAME,TASKNAME', [
237 'where STATUS < {:d} and ONDISK < {:d}'.
format(
238 TaskManager.StatusCodes[
'POSTPROCESSING'],
239 TaskManager.OnDiskCodes[
'ARCHIVED'])]):
240 a = TaskAnalyzer(proddir, t[
'DSNAME'], t[
'TASKNAME'])
242 print (
'Updating task {}/{}'.
format(t[
'DSNAME'], t[
'TASKNAME']))
243 a.updateStatus(taskman)
245 taskman.setDiskStatus(t[
'DSNAME'], t[
'TASKNAME'],
246 TaskManager.OnDiskCodes[
'DELETED'])
249 if cmd==
'update' and len(args)==3:
252 taskList =
getFullTaskNames(taskman,args[1],args[2],confirmWithUser=
not options.batch,addWildCards=
not options.nowildcards)
253 except TaskManagerCheckError
as e:
257 a = TaskAnalyzer(proddir,t[0],t[1])
259 print (
'Updating task %s/%s' % (t[0],t[0]))
260 a.updateStatus(taskman)
262 taskman.setDiskStatus(t[0],t[0],TaskManager.OnDiskCodes[
'DELETED'])
270 if len(cmdargs) == 2:
272 taskname = cmdargs[1]
273 taskpath = os.path.join(proddir, dsname, taskname)
275 taskpath = os.path.join(proddir,
'*',
'*')
277 parser.error(
'Command rebuild takes 0 or 2 arguments')
279 print (
'Will reconstruct database from directory: {}'.
format(taskpath))
284 if not options.batch:
285 print (
'Rebuilding the following tasks:')
287 print (
' DB {:50} {}'.
format(
'DATASET NAME',
'TASK NAME'))
288 print (
' {}'.
format(85 *
'-'))
289 for p
in glob.glob(taskpath):
290 if not os.path.isdir(p):
continue
291 (dsName, taskName) = p.split(
'/')[-2:]
292 nDefined = taskman.getNTasks([
293 'where DSNAME=', DbParam(dsName),
294 'and TASKNAME=', DbParam(taskName)])
295 dirs.append((dsName, taskName, nDefined))
298 print (
' {:50} {}'.
format(dsName, taskName))
299 elif options.verbose:
300 print (
' * {:50} {}'.
format(dsName, taskName))
302 a = raw_input(
'\nARE YOU SURE [n] ? ')
304 sys.exit(
'ERROR: Rebuilding aborted by user')
307 for (dsName, taskName, nDefined)
in dirs:
309 print (
'Adding missing entry for task {}/{}'.
format(dsName, taskName))
310 configFile = glob.glob(os.path.join(proddir, dsName, taskName,
'*',
'*.config.py'))
312 mtime = os.stat(configFile[0])[stat.ST_MTIME]
316 template = config[
'jobConfig'][
'joboptionpath']
320 release = config[
'jobConfig'][
'release']
324 taskpostprocsteps = config[
'jobConfig'][
'taskpostprocsteps']
326 taskpostprocsteps =
''
328 comment = config[
'jobConfig'][
'comment'] +
' (rebuilt from config file)'
330 comment =
'rebuilt from config file'
331 taskman.addTask(dsName, taskName, template, release, 0, taskpostprocsteps,
333 createdUser=
'UNKNOWN',
334 createdHost=
'UNKNOWN',
337 print (
'WARNING: no config file found, unable to determine task creation time and configuration details')
339 taskman.addTask(dsName, taskName,
'UNKNOWN',
'UNKNOWN', 0,
340 comment=
'rebuilt, no config file found')
341 a = TaskAnalyzer(proddir, dsName, taskName)
343 a.updateStatus(taskman)
345 print (
'Entry exists already for task {}/{} - SKIPPED'.
format(dsName, taskName))
347 print (
'ERROR: {:d} conflicting task entries for task {}/{} - SKIPPED'.
format(nDefined, dsName, taskName))
355 if len(cmdargs) == 1:
356 fromDbconn = cmdargs[0]
358 elif len(cmdargs) == 3:
360 taskname = cmdargs[1]
361 fromDbconn = cmdargs[2]
365 wildcards = string.maketrans(
'*?',
'%_')
367 if '*' in dsname
or '?' in dsname:
368 qual.append(
"where DSNAME like '{}'".
format(dsname.translate(wildcards)))
370 qual.extend([
'where DSNAME =', DbParam(dsname)])
371 if '*' in taskname
or '?' in taskname:
372 qual.append(
"and TASKNAME like '{}'".
format(taskname.translate(wildcards)))
374 qual.extend([
'and TASKNAME =', DbParam(taskname)])
376 parser.error(
'Command import takes 1 or 3 arguments')
379 destDbtype, destDbname = TaskManager.parseConnectionInfo(options.dbconn)
380 fromDbtype, fromDbname = TaskManager.parseConnectionInfo(fromDbconn)
381 except ValueError
as e:
382 sys.exit(
'ERROR: Bad database connection string {}'.
format(e))
384 print (
'Import from: {}:{}'.
format(fromDbtype, fromDbname))
385 print (
'Import into: {}:{}'.
format(destDbtype, destDbname))
391 qual.append(
'order by RUNNR desc')
392 for t
in fromman.taskIterDict(qual=qual):
393 print (
'Importing ', t[
'DSNAME'],
'/' , t[
'TASKNAME'],
'...')
394 if not 'RUNNR' in t.keys():
397 if not t.get(
'RESULTLINKS'):
400 files = t.get(
'RESULTFILES',
'')
404 taskname = t[
'TASKNAME']
405 summaryFiles = glob.glob(
'%s/%s/*beamspot.gif' % (dsname,taskname))
406 if len(summaryFiles)>0:
407 for r
in summaryFiles:
410 if not f
in files.split():
411 files =
' '.
join([files,f])
412 if not pdf
in files.split():
413 files =
' '.
join([files,pdf])
414 links +=
' <a href="../files?u=%s/%s/%s">summary</a>' %(dsname,taskname,f)
415 links +=
' (<a href="/jobfiles/%s/%s/%s">pdf</a>)' %(dsname,taskname,pdf)
416 monitoringFiles = glob.glob(
'%s/%s/*beamspotmon.gif' % (dsname,taskname))
417 if len(monitoringFiles)>0:
418 for r
in monitoringFiles:
421 if not f
in files.split():
422 files =
' '.
join([files,f])
423 if not pdf
in files.split():
424 files =
' '.
join([files,pdf])
425 links +=
' <a href="../files?u=%s/%s/%s">monitoring</a>' %(dsname,taskname,f)
426 links +=
' (<a href="/jobfiles/%s/%s/%s">pdf</a>)' %(dsname,taskname,pdf)
427 t[
'RESULTFILES'] = files
428 t[
'RESULTLINKS'] = links
432 for fieldName
in t.keys():
433 newFieldName = fieldName
434 if fieldName ==
'TASKID':
436 elif fieldName ==
'COMMENT':
437 newFieldName =
'TASKCOMMENT'
439 fieldNameString +=
','
441 fieldNameString += newFieldName
442 params.append(DbParam(t[fieldName]))
443 q = [
'insert into TASKS (%s) values (' % fieldNameString ]
446 taskman.execute(q,
True)
449 print (
'{:d} tasks imported'.
format(nImported))
456 if cmd ==
'setstatus':
457 if len(cmdargs) != 3: parser.error(
'Command setstatus takes 3 arguments')
459 taskname = cmdargs[1]
462 status = TaskManager.StatusCodes.get(statusName)
464 print (
'Setting task status to {} (code {:d})'.
format(statusName,status))
466 sys.exit(
'ERROR: Illegal status code name {}'.
format(statusName))
472 confirmWithUser=
not options.batch,
473 addWildCards=
not options.nowildcards)
474 except TaskManagerCheckError
as e:
479 n += taskman.setValue(t[0], t[1],
'STATUS', status)
480 print (
'%i task status entries updated\n' % (n))
487 if cmd ==
'setfield':
488 if len(cmdargs) != 4: parser.error(
'Command setfield takes 4 arguments')
490 taskname = cmdargs[1]
492 fieldValue = cmdargs[3]
494 print (
'Setting field %s to: %s' % (fieldName,fieldValue))
499 requireSingleTask=
True,
500 confirmWithUser=
not options.batch,
501 addWildCards=
not options.nowildcards)
502 except TaskManagerCheckError
as e:
504 n = taskman.setValue(dsname,task,fieldName,fieldValue)
505 print (
'{:d} task status entries updated'.
format(n))
512 if cmd==
'delete' and len(args)==3:
515 taskList =
getFullTaskNames(taskman,args[1],args[2],confirmWithUser=
not options.batch,addWildCards=
not options.nowildcards)
516 except TaskManagerCheckError
as e:
521 n += taskman.deleteTask(t[0],t[1])
522 print (
'\n%i task entries deleted\n' % (n))
527 if cmd==
'listResults' and len(args)==3:
530 taskList =
getFullTaskNames(taskman,args[1],args[2],confirmWithUser=
not options.batch,addWildCards=
not options.nowildcards)
531 except TaskManagerCheckError
as e:
535 print (
'Listing results for task %s/%s ...' % (t[0],t[1]) )
536 print (taskman.getTaskValue(t[0],t[1],
'RESULTFILES'))
537 print (taskman.getTaskValue(t[0],t[1],
'RESULTLINKS'))
545 if cmd==
'deleteResults' and len(args)==3:
548 taskList =
getFullTaskNames(taskman,args[1],args[2],confirmWithUser=
not options.batch,addWildCards=
not options.nowildcards)
549 except TaskManagerCheckError
as e:
553 print (
'Deleting results for task %s/%s ...' % (t[0],t[1]) )
554 taskman.setValue(t[0],t[1],
'RESULTFILES',
'')
555 taskman.setValue(t[0],t[1],
'RESULTLINKS',
'')
562 if cmd==
'deleteTask' and len(args)==3:
565 [(dsname,task)] =
getFullTaskNames(taskman,args[1],args[2],confirmWithUser=
not options.batch,requireSingleTask=
True,addWildCards=
not options.nowildcards)
566 except TaskManagerCheckError
as e:
569 dir =
'/'.
join([proddir,dsname,task])
571 if os.path.exists(dir):
572 os.system(
'du -hs %s' % dir)
574 print (
'No task files found (no directory %s)' % dir)
575 if not options.batch:
576 a = input(
'\nDeleteing task entry and all files - ARE YOU SURE [n] ? ')
578 print (
"ERROR: Deletion aborted by user.")
580 n = taskman.deleteTask(dsname,task)
581 print (
'\n%i task entry deleted.' % (n))
582 print (
'Deleting ',dir,
' ...')
583 os.system(
'rm -rf %s' % dir)
586 if cmd==
'deleteTask' and len(args)==2:
591 taskList =
getFullTaskNames(taskman,dsname,taskname,addWildCards=
not options.nowildcards)
592 except TaskManagerCheckError
as e:
600 print (
'\n%i task(s) found\n' % len(taskList))
604 print (
" %-50s %s" % (dsname,taskname) )
605 dir =
'/'.
join([proddir,dsname,taskname])
607 if os.path.exists(dir):
608 os.system(
'du -hs %s' % dir)
610 print (
'No task files found (no directory %s)' % dir)
611 if not options.batch:
612 a = input(
'\nDeleteing task entry and all files - ARE YOU SURE [n] ? ')
614 print (
"ERROR: Deletion aborted by user.")
616 n += taskman.deleteTask(dsname,taskname)
617 print (
'Deleting ',dir,
' ...')
618 os.system(
'rm -rf %s' % dir)
619 print (
'\n%i tasks deleted.' % (n))
622 if cmd==
'notifyFailed' and len(args)<3:
624 earliestUpdateTime = time.time()-
float(args[1])
626 earliestUpdateTime = 0
630 for t
in taskman.taskIterDict(
'*',[
"where TASKNAME like '%%%s%%'" % options.runtaskname,
631 "and UPDATED >= ", DbParam(earliestUpdateTime),
632 "and (STATUS = %i or STATUS = %i)" %(TaskManager.StatusCodes[
'FAILED'], TaskManager.StatusCodes[
'POSTPROCFAILED']),
633 "order by RUNNR desc"]):
636 t[
'STATUS'] = [k
for k, v
in TaskManager.StatusCodes.items()
if v == t[
'STATUS']][0]
641 dsWidth = len(
max([t[
'DSNAME']
for t
in taskList],key=len))
642 taskWidth = len(
max([t[
'TASKNAME']
for t
in taskList],key=len))
643 statusWidth = len(
max([t[
'STATUS']
for t
in taskList],key=len))
645 bodyFormat =
'%%(DSNAME)%ss %%(TASKNAME)%ss %%(STATUS)%ss' %(dsWidth, taskWidth, statusWidth)
647 mailBody =
'The following %s tasks have reported failures' % len(taskList)
650 mailBody +=
' in the last %s hours' % hours
652 mailBody +=
'\n'.
join([bodyFormat % t
for t
in taskList])
653 stat = os.system(
'source /afs/cern.ch/user/a/atlidbs/cron/mailwrapper; mailwrapper "[atlidbs] %s tasks report failures" "%s"' %(len(taskList), mailBody)) >> 8
656 print (
'\nERROR: Unable to send mail\n')
666 if cmd==
'debug' and len(args)==1:
669 os.environ[
'PYTHONINSPECT'] =
'1'
671 print (
'Entering python shell. taskman is the TaskManager object. Be careful!')
674 print (
'ERROR: Illegal command or number of arguments')