5 from __future__
import print_function
8 taskman is a command line utility to run TaskManager functions.
10 __author__ =
'Juerg Beringer'
11 __version__ =
'taskman.py atlas/athena'
12 __usage__ =
'''%prog [options] taskdbconn command [args ...]
16 init Create a new database
17 checkdb Check (and fix) sqlite database
18 checkdup Check task database for duplicate entries
19 debug Interactive debugging (EXPERTS ONLY!)
20 dump [DSNAME TASKNAME] Dump all or selected tasks in database
21 show DSNAME [TASKNAME] Show list of tasks
22 update Update information of ongoing tasks
23 update DSNAME TASKNAME Update information of selected tasks
24 rebuild [DSNAME TASKNAME] Rebuild missing or selected database entries from job files
25 import [DSNAME TASKNAME] DBCONN Import all or selected tasks from database DBCONN (duplicates not checked)
26 setstatus DSNAME TASKNAME STATUS Set status of selected tasks to STATUS
27 setfield DSNAME TASKNAME FIELDNAME VALUE Set database field to value (experts only!)
28 delete DSNAME TASKNAME Delete entry for selected task(s) (task files are NOT removed)
29 deleteResults DSNAME TASKNAME Delete postprocessing results for selected set of tasks
30 deleteTask DSNAME TASKNAME Delete task files and corresponding TaskManager entry
31 for a single task (will prompt for confirmation unless
32 option --batch is used)
38 from InDetBeamSpotExample.Utils
import getRunFromName
40 from future
import standard_library
41 standard_library.install_aliases()
45 from optparse
import OptionParser
46 parser = OptionParser(usage=__usage__, version=__version__)
48 parser.add_option(
'-b',
'--batch', dest=
'batch', action=
'store_true', default=
False, help=
'batch mode - never ask for confirmation')
49 parser.add_option(
'-v',
'--verbose', dest=
'verbose', action=
'store_true', default=
False, help=
'enable more verbose output for some commands')
50 parser.add_option(
'-n',
'--nowildcards', dest=
'nowildcards', action=
'store_true', default=
False, help=
'do not add wildcards when looking up dataset and task names')
51 parser.add_option(
'-d',
'--dbconn', dest=
'dbconn', default=
'', help=
'task manager database connection string (default: check TASKDB, otherwise use sqlite_file:taskdata.db)')
52 parser.add_option(
'',
'--proddir', dest=
'proddir', default=
'.', help=
'production directory (default: "."')
53 parser.add_option(
'-p',
'--pretty', dest=
'pretty', action=
'store_true', default=
None, help=
'try to nicely format output (default: auto)')
54 parser.add_option(
'',
'--no-pretty', dest=
'pretty', action=
'store_false', help=
'do not attempt to format output')
55 parser.add_option(
'',
'--runtaskname', dest=
'runtaskname', default=
'CB_BEAMSPOT', help=
'task name')
56 (options,args) = parser.parse_args()
59 parser.error(
'wrong number of command line arguments')
63 proddir = options.proddir
64 if not os.path.exists(proddir):
65 sys.exit(
'ERROR: Job directory %s does not exist or is unreadable' % proddir)
68 ''' Open task manager '''
69 dbconn = dbconn
or options.dbconn
71 return TaskManager(dbconn)
73 print (
'ERROR: Unable to access task manager database %s' % dbconn)
80 if cmdargs: parser.error(
'Command init does not take arguments')
82 dbtype, dbname = TaskManager.parseConnectionInfo(options.dbconn)
83 except ValueError
as e:
84 sys.exit(
'ERROR: {}'.
format(e))
85 print (
'Will initialise schema for database: {}:{}'.
format(dbtype, dbname))
88 print (
'Checking for existing database ...')
90 with TaskManager(options.dbconn):
pass
91 except ValueError
as e:
92 print (
'Test connection failed: {}'.
format(e))
94 print (
'Test connection succeeded: the database already exists!')
97 a = raw_input(
'\nRECREATING TASK DATABASE SCHEMA - ANY EXISTING DATA WILL BE ERASED!\n\nARE YOU REALLY ABSOLUTELY SURE [n] ? ')
98 if a !=
'y': sys.exit(
'ERROR: Rebuilding aborted by user')
100 with TaskManager(options.dbconn, createDatabase=
True):
pass
108 if cmdargs: parser.error(
'Command checkdb does not take arguments')
110 dbtype, dbfile = TaskManager.parseConnectionInfo(options.dbconn)
112 sys.exit(
"ERROR: Illegal or empty/default database connection string - must provide explicity SQLite file reference")
113 if dbtype !=
'sqlite_file':
114 sys.exit(
'ERROR: checkdb is only supported for SQLite databases')
115 if not os.path.exists(dbfile):
116 sys.exit(
'ERROR: SQLite file %s does not exist' % dbfile)
118 (status, output) = subprocess.getstatusoutput(
"sqlite3 %s 'pragma integrity_check;'" % dbfile)
120 sys.exit(
'ERROR: Error executing sqlite3 command')
122 print (
'ERROR: SQLite database file has errors:')
126 if not options.batch:
127 a = raw_input(
'Do you want to try VACUUM [n] ? ')
129 sys.exit(
'\nERROR: VACUUM not executed - please fix database file manually')
130 (status, output) = subprocess.getstatusoutput(
"sqlite3 %s 'vacuum;'" % dbfile)
133 sys.exit(
'ERROR: VACUUM failed')
135 (status, output) = subprocess.getstatusoutput(
"sqlite3 %s 'pragma integrity_check;'" % dbfile)
136 if status != 0
or output !=
'ok':
138 sys.exit(
'ERROR: Integrity check still failed - please check')
139 print (
'INFO: SQLite file now passes integrity test (content may still have errors)')
142 print (
'INFO: SQLite file {} ok'.
format(dbfile))
149 if cmd ==
'checkdup':
150 if cmdargs: parser.error(
'Command checkdup does not take arguments')
154 for t
in taskman.taskIter(
'dsname,taskname,count(*)', [
'group by dsname,taskname']):
157 print (
'Duplicate task: {} / {}'.
format(t[0], t[1]))
158 for d
in taskman.taskIterDict(qual=[
159 'where DSNAME =', DbParam(t[0]),
160 'and TASKNAME =', DbParam(t[1]),
162 print (
' TASKID =', d[
'TASKID'])
163 print (nDuplicates,
'duplicates found')
173 print (taskman.getNTasks(),
'task(s) found:\n')
175 for t
in taskman.taskIterDict():
176 print (
'\n\nTask {} / {}:\n'.
format(
179 print (pprint.pformat(t))
181 for t
in taskman.taskIter():
185 elif len(cmdargs) == 2:
187 taskname = cmdargs[1]
192 addWildCards=
not options.nowildcards)
193 except TaskManagerCheckError
as e:
196 taskEntry = taskman.getTaskDict(t[0], t[1])
197 if options.pretty
is None or options.pretty:
198 print (
'\n\nTask {} / {}:\n'.
format(
200 taskEntry[
'TASKNAME']))
201 print (pprint.pformat(taskEntry))
206 else: parser.error(
'Command dump takes either 0 or 2 arguments')
212 if cmd==
'show' and (len(args)==2
or len(args)==3):
214 taskname = args[2]
if len(args)>2
else ''
217 taskList =
getFullTaskNames(taskman,dsname,taskname,addWildCards=
not options.nowildcards)
219 print (
" %-50s %s" % (
'DATASET NAME',
'TASK NAME'))
220 print (
" %s" % (75*
'-'))
222 print (
" %-50s %s" % (t[0],t[1]) )
223 print (
'\n%i task(s) found\n' % len(taskList))
225 except TaskManagerCheckError
as e:
233 if cmd==
'update' and len(args)==1:
234 if cmdargs: parser.error(
'Command update does not take any arguments')
237 for t
in taskman.taskIterDict(
'DSNAME,TASKNAME', [
238 'where STATUS < {:d} and ONDISK < {:d}'.
format(
239 TaskManager.StatusCodes[
'POSTPROCESSING'],
240 TaskManager.OnDiskCodes[
'ARCHIVED'])]):
241 a = TaskAnalyzer(proddir, t[
'DSNAME'], t[
'TASKNAME'])
243 print (
'Updating task {}/{}'.
format(t[
'DSNAME'], t[
'TASKNAME']))
244 a.updateStatus(taskman)
246 taskman.setDiskStatus(t[
'DSNAME'], t[
'TASKNAME'],
247 TaskManager.OnDiskCodes[
'DELETED'])
250 if cmd==
'update' and len(args)==3:
253 taskList =
getFullTaskNames(taskman,args[1],args[2],confirmWithUser=
not options.batch,addWildCards=
not options.nowildcards)
254 except TaskManagerCheckError
as e:
258 a = TaskAnalyzer(proddir,t[0],t[1])
260 print (
'Updating task %s/%s' % (t[0],t[0]))
261 a.updateStatus(taskman)
263 taskman.setDiskStatus(t[0],t[0],TaskManager.OnDiskCodes[
'DELETED'])
271 if len(cmdargs) == 2:
273 taskname = cmdargs[1]
274 taskpath = os.path.join(proddir, dsname, taskname)
276 taskpath = os.path.join(proddir,
'*',
'*')
278 parser.error(
'Command rebuild takes 0 or 2 arguments')
280 print (
'Will reconstruct database from directory: {}'.
format(taskpath))
285 if not options.batch:
286 print (
'Rebuilding the following tasks:')
288 print (
' DB {:50} {}'.
format(
'DATASET NAME',
'TASK NAME'))
289 print (
' {}'.
format(85 *
'-'))
290 for p
in glob.glob(taskpath):
291 if not os.path.isdir(p):
continue
292 (dsName, taskName) = p.split(
'/')[-2:]
293 nDefined = taskman.getNTasks([
294 'where DSNAME=', DbParam(dsName),
295 'and TASKNAME=', DbParam(taskName)])
296 dirs.append((dsName, taskName, nDefined))
299 print (
' {:50} {}'.
format(dsName, taskName))
300 elif options.verbose:
301 print (
' * {:50} {}'.
format(dsName, taskName))
303 a = raw_input(
'\nARE YOU SURE [n] ? ')
305 sys.exit(
'ERROR: Rebuilding aborted by user')
308 for (dsName, taskName, nDefined)
in dirs:
310 print (
'Adding missing entry for task {}/{}'.
format(dsName, taskName))
311 configFile = glob.glob(os.path.join(proddir, dsName, taskName,
'*',
'*.config.py'))
313 mtime = os.stat(configFile[0])[stat.ST_MTIME]
317 template = config[
'jobConfig'][
'joboptionpath']
321 release = config[
'jobConfig'][
'release']
325 taskpostprocsteps = config[
'jobConfig'][
'taskpostprocsteps']
327 taskpostprocsteps =
''
329 comment = config[
'jobConfig'][
'comment'] +
' (rebuilt from config file)'
331 comment =
'rebuilt from config file'
332 taskman.addTask(dsName, taskName, template, release, 0, taskpostprocsteps,
334 createdUser=
'UNKNOWN',
335 createdHost=
'UNKNOWN',
338 print (
'WARNING: no config file found, unable to determine task creation time and configuration details')
340 taskman.addTask(dsName, taskName,
'UNKNOWN',
'UNKNOWN', 0,
341 comment=
'rebuilt, no config file found')
342 a = TaskAnalyzer(proddir, dsName, taskName)
344 a.updateStatus(taskman)
346 print (
'Entry exists already for task {}/{} - SKIPPED'.
format(dsName, taskName))
348 print (
'ERROR: {:d} conflicting task entries for task {}/{} - SKIPPED'.
format(nDefined, dsName, taskName))
356 if len(cmdargs) == 1:
357 fromDbconn = cmdargs[0]
359 elif len(cmdargs) == 3:
361 taskname = cmdargs[1]
362 fromDbconn = cmdargs[2]
366 wildcards = string.maketrans(
'*?',
'%_')
368 if '*' in dsname
or '?' in dsname:
369 qual.append(
"where DSNAME like '{}'".
format(dsname.translate(wildcards)))
371 qual.extend([
'where DSNAME =', DbParam(dsname)])
372 if '*' in taskname
or '?' in taskname:
373 qual.append(
"and TASKNAME like '{}'".
format(taskname.translate(wildcards)))
375 qual.extend([
'and TASKNAME =', DbParam(taskname)])
377 parser.error(
'Command import takes 1 or 3 arguments')
380 destDbtype, destDbname = TaskManager.parseConnectionInfo(options.dbconn)
381 fromDbtype, fromDbname = TaskManager.parseConnectionInfo(fromDbconn)
382 except ValueError
as e:
383 sys.exit(
'ERROR: Bad database connection string {}'.
format(e))
385 print (
'Import from: {}:{}'.
format(fromDbtype, fromDbname))
386 print (
'Import into: {}:{}'.
format(destDbtype, destDbname))
392 qual.append(
'order by RUNNR desc')
393 for t
in fromman.taskIterDict(qual=qual):
394 print (
'Importing ', t[
'DSNAME'],
'/' , t[
'TASKNAME'],
'...')
395 if not 'RUNNR' in t.keys():
398 if not t.get(
'RESULTLINKS'):
401 files = t.get(
'RESULTFILES',
'')
405 taskname = t[
'TASKNAME']
406 summaryFiles = glob.glob(
'%s/%s/*beamspot.gif' % (dsname,taskname))
407 if len(summaryFiles)>0:
408 for r
in summaryFiles:
411 if not f
in files.split():
412 files =
' '.
join([files,f])
413 if not pdf
in files.split():
414 files =
' '.
join([files,pdf])
415 links +=
' <a href="../files?u=%s/%s/%s">summary</a>' %(dsname,taskname,f)
416 links +=
' (<a href="/jobfiles/%s/%s/%s">pdf</a>)' %(dsname,taskname,pdf)
417 monitoringFiles = glob.glob(
'%s/%s/*beamspotmon.gif' % (dsname,taskname))
418 if len(monitoringFiles)>0:
419 for r
in monitoringFiles:
422 if not f
in files.split():
423 files =
' '.
join([files,f])
424 if not pdf
in files.split():
425 files =
' '.
join([files,pdf])
426 links +=
' <a href="../files?u=%s/%s/%s">monitoring</a>' %(dsname,taskname,f)
427 links +=
' (<a href="/jobfiles/%s/%s/%s">pdf</a>)' %(dsname,taskname,pdf)
428 t[
'RESULTFILES'] = files
429 t[
'RESULTLINKS'] = links
433 for fieldName
in t.keys():
434 newFieldName = fieldName
435 if fieldName ==
'TASKID':
437 elif fieldName ==
'COMMENT':
438 newFieldName =
'TASKCOMMENT'
440 fieldNameString +=
','
442 fieldNameString += newFieldName
443 params.append(DbParam(t[fieldName]))
444 q = [
'insert into TASKS (%s) values (' % fieldNameString ]
447 taskman.execute(q,
True)
450 print (
'{:d} tasks imported'.
format(nImported))
457 if cmd ==
'setstatus':
458 if len(cmdargs) != 3: parser.error(
'Command setstatus takes 3 arguments')
460 taskname = cmdargs[1]
463 status = TaskManager.StatusCodes.get(statusName)
465 print (
'Setting task status to {} (code {:d})'.
format(statusName,status))
467 sys.exit(
'ERROR: Illegal status code name {}'.
format(statusName))
473 confirmWithUser=
not options.batch,
474 addWildCards=
not options.nowildcards)
475 except TaskManagerCheckError
as e:
480 n += taskman.setValue(t[0], t[1],
'STATUS', status)
481 print (
'%i task status entries updated\n' % (n))
488 if cmd ==
'setfield':
489 if len(cmdargs) != 4: parser.error(
'Command setfield takes 4 arguments')
491 taskname = cmdargs[1]
493 fieldValue = cmdargs[3]
495 print (
'Setting field %s to: %s' % (fieldName,fieldValue))
500 requireSingleTask=
True,
501 confirmWithUser=
not options.batch,
502 addWildCards=
not options.nowildcards)
503 except TaskManagerCheckError
as e:
505 n = taskman.setValue(dsname,task,fieldName,fieldValue)
506 print (
'{:d} task status entries updated'.
format(n))
513 if cmd==
'delete' and len(args)==3:
516 taskList =
getFullTaskNames(taskman,args[1],args[2],confirmWithUser=
not options.batch,addWildCards=
not options.nowildcards)
517 except TaskManagerCheckError
as e:
522 n += taskman.deleteTask(t[0],t[1])
523 print (
'\n%i task entries deleted\n' % (n))
528 if cmd==
'listResults' and len(args)==3:
531 taskList =
getFullTaskNames(taskman,args[1],args[2],confirmWithUser=
not options.batch,addWildCards=
not options.nowildcards)
532 except TaskManagerCheckError
as e:
536 print (
'Listing results for task %s/%s ...' % (t[0],t[1]) )
537 print (taskman.getTaskValue(t[0],t[1],
'RESULTFILES'))
538 print (taskman.getTaskValue(t[0],t[1],
'RESULTLINKS'))
546 if cmd==
'deleteResults' and len(args)==3:
549 taskList =
getFullTaskNames(taskman,args[1],args[2],confirmWithUser=
not options.batch,addWildCards=
not options.nowildcards)
550 except TaskManagerCheckError
as e:
554 print (
'Deleting results for task %s/%s ...' % (t[0],t[1]) )
555 taskman.setValue(t[0],t[1],
'RESULTFILES',
'')
556 taskman.setValue(t[0],t[1],
'RESULTLINKS',
'')
563 if cmd==
'deleteTask' and len(args)==3:
566 [(dsname,task)] =
getFullTaskNames(taskman,args[1],args[2],confirmWithUser=
not options.batch,requireSingleTask=
True,addWildCards=
not options.nowildcards)
567 except TaskManagerCheckError
as e:
570 dir =
'/'.
join([proddir,dsname,task])
572 if os.path.exists(dir):
573 os.system(
'du -hs %s' % dir)
575 print (
'No task files found (no directory %s)' % dir)
576 if not options.batch:
577 a =
input(
'\nDeleteing task entry and all files - ARE YOU SURE [n] ? ')
579 print (
"ERROR: Deletion aborted by user.")
581 n = taskman.deleteTask(dsname,task)
582 print (
'\n%i task entry deleted.' % (n))
583 print (
'Deleting ',dir,
' ...')
584 os.system(
'rm -rf %s' % dir)
587 if cmd==
'deleteTask' and len(args)==2:
592 taskList =
getFullTaskNames(taskman,dsname,taskname,addWildCards=
not options.nowildcards)
593 except TaskManagerCheckError
as e:
601 print (
'\n%i task(s) found\n' % len(taskList))
605 print (
" %-50s %s" % (dsname,taskname) )
606 dir =
'/'.
join([proddir,dsname,taskname])
608 if os.path.exists(dir):
609 os.system(
'du -hs %s' % dir)
611 print (
'No task files found (no directory %s)' % dir)
612 if not options.batch:
613 a =
input(
'\nDeleteing task entry and all files - ARE YOU SURE [n] ? ')
615 print (
"ERROR: Deletion aborted by user.")
617 n += taskman.deleteTask(dsname,taskname)
618 print (
'Deleting ',dir,
' ...')
619 os.system(
'rm -rf %s' % dir)
620 print (
'\n%i tasks deleted.' % (n))
623 if cmd==
'notifyFailed' and len(args)<3:
625 earliestUpdateTime = time.time()-
float(args[1])
627 earliestUpdateTime = 0
631 for t
in taskman.taskIterDict(
'*',[
"where TASKNAME like '%%%s%%'" % options.runtaskname,
632 "and UPDATED >= ", DbParam(earliestUpdateTime),
633 "and (STATUS = %i or STATUS = %i)" %(TaskManager.StatusCodes[
'FAILED'], TaskManager.StatusCodes[
'POSTPROCFAILED']),
634 "order by RUNNR desc"]):
637 t[
'STATUS'] = [k
for k, v
in TaskManager.StatusCodes.items()
if v == t[
'STATUS']][0]
642 dsWidth = len(
max([t[
'DSNAME']
for t
in taskList],key=len))
643 taskWidth = len(
max([t[
'TASKNAME']
for t
in taskList],key=len))
644 statusWidth = len(
max([t[
'STATUS']
for t
in taskList],key=len))
646 bodyFormat =
'%%(DSNAME)%ss %%(TASKNAME)%ss %%(STATUS)%ss' %(dsWidth, taskWidth, statusWidth)
648 mailBody =
'The following %s tasks have reported failures' % len(taskList)
651 mailBody +=
' in the last %s hours' % hours
653 mailBody +=
'\n'.
join([bodyFormat % t
for t
in taskList])
654 stat = os.system(
'source /afs/cern.ch/user/a/atlidbs/cron/mailwrapper; mailwrapper "[atlidbs] %s tasks report failures" "%s"' %(len(taskList), mailBody)) >> 8
657 print (
'\nERROR: Unable to send mail\n')
667 if cmd==
'debug' and len(args)==1:
670 os.environ[
'PYTHONINSPECT'] =
'1'
672 print (
'Entering python shell. taskman is the TaskManager object. Be careful!')
675 print (
'ERROR: Illegal command or number of arguments')