ATLAS Offline Software
TaskManager.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
4 
5 """
6 TaskManager is a tool for keeping track of JobRunner jobs using a
7 database. TaskManager uses the notion of a task as the primary unit for
8 bookkeeping. Each task includes one or more jobs run using the same
9 JobRunner template and typically resulting from a single invocation of
10 JobRunner through one of the makeDPD.py or runDPD.py
11 scripts. TaskManager keeps track of the status of tasks and associated
12 jobs, runs postprocessing scripts, and accumulates results for later
13 analysis accross tasks.
14 
15 Written by Juerg Beringer (LBNL) in 2009.
16 """
17 __author__ = 'Juerg Beringer'
18 __version__ = 'TaskManager.py atlas/athena'
19 
20 import time, os, glob
21 
22 from InDetBeamSpotExample.Utils import getRunFromName
23 from InDetBeamSpotExample.Utils import getUserName
24 import six
25 
26 
27 # Exception classes
28 class TaskManagerCheckError(Exception):
29  """Task manager exception class."""
30 
31 class TaskManagerDatabaseError(Exception):
32  """Task manager exception because of a database error"""
33 
34 
35 def dictFactory(cursor, row):
36  """Convert a tuple from a database query into a dictonary."""
37  d = {}
38  for idx, col in enumerate(cursor.description):
39  d[col[0]] = row[idx]
40  return d
41 
42 
43 def getKey(d, v):
44  """Get the key for which dictonary d has an entry with value v.
45  Returns 'Undefined' if there's no such key, if several values are found."""
46  l = [k for k in d.keys() if d[k]==v]
47  if len(l)==1:
48  return l[0]
49  else:
50  return 'Undefined'
51 
52 
53 def getStatusClass(status):
54  """Returns 'ok', 'warn' or 'bad' depending on the value of status."""
55  statusMap = {
56  7: 'bad',
57  10: 'ok',
58  11: 'bad'
59  }
60  if status in statusMap:
61  return statusMap[status]
62  else:
63  return 'warn'
64 
65 
66 def appendUnique(s,v):
67  if not s:
68  s = '' # make sure it is not None
69  if v not in s.split():
70  s = ' '.join([s,v])
71  return s
72 
73 
74 def getFullTaskNames(taskman,dsname,taskname,requireSingleTask=False,confirmWithUser=False,addWildCards=True):
75  """Retrieve the full dataset and task names given a pair of (dsname,task) that may
76  contain wildcards or be just a parital name such as the run number. Depending
77  on the requireSingleTask and confirmWithUser settings a TaskManagerCheckError
78  is raised if there are multiple tasks or if the user doesn't confirm."""
79  taskList = taskman.getTaskNames(dsname,taskname,addWildCards)
80  if len(taskList)==0:
81  raise TaskManagerCheckError ('ERROR: No tasks found for dataset = %s, task = %s' % (dsname,taskname))
82  if requireSingleTask and len(taskList)!=1:
83  m = "ERROR: Multiple data set names found for dataset = %s, task = %s\n Please use full dataset or task name from list below, using option -n if necessary:\n\n" % (dsname,taskname)
84  m += " %-50s %s\n" % ('DATASET NAME','TASK NAME')
85  m += " %s\n" % (75*'-')
86  for t in taskList:
87  m += " %-50s %s\n" % (t[0],t[1])
88  m += '\n'
89  raise TaskManagerCheckError (m)
90  if confirmWithUser:
91  print ('Please confirm that you want to execute this command for the following tasks:\n')
92  print (" %-50s %s" % ('DATASET NAME','TASK NAME'))
93  print (" %s" % (75*'-'))
94  for t in taskList:
95  print (" %-50s %s" % (t[0],t[1]))
96  a = input('\nARE YOU SURE [n] ? ')
97  if a!='y':
98  raise TaskManagerCheckError ('ERROR: Aborted by user')
99  print()
100  return taskList
101 
102 
103 def getJobConfig(jobDir,dsName,taskName,jobName='*'):
104  """Read config dict from job files."""
105  config = {}
106  configFile = glob.glob('%s/%s/%s/%s/%s' % (jobDir,dsName,taskName,jobName,'*.config.py.final.py'))
107  if not configFile:
108  configFile = glob.glob('%s/%s/%s/%s/%s' % (jobDir,dsName,taskName,jobName,'*.config.py'))
109  if configFile:
110  exec(open(configFile[0]).read(),config) # Eval config file and put defs into config dict
111  return config['jobConfig']
112 
113 
115  """A utility class to hold a single parameter to be used in SQL queries."""
116  def __init__(self,value):
117  self.value = value
118  def __repr__(self):
119  return '%r' % self.value
120 
121 
123  """TaskManager is tool for keeping track of JobRunner jobs."""
124 
125  StatusCodes = { 'UNKNOWN': -1,
126  'CONFIGURED': 0,
127  'SUBMITTED': 1,
128  'RUNNING': 2,
129  'PARTIALFAILED': 3,
130  # Codes above are considered tasks whose status needs to
131  # be monitored and updated
132  'POSTPROCESSING': 5,
133  'POSTPROCRUNNING': 6,
134  'POSTPROCFAILED': 7,
135  'COMPLETED': 10,
136  'FAILED': 11}
137 
138  OnDiskCodes = { 'UNKNOWN': -1,
139  'ALLONDISK': 0,
140  'RESULTSONDISK': 1,
141  # Codes above are for jobs whose task directory is on disk
142  'ARCHIVED': 10,
143  'DELETED': 11 }
144 
145  @classmethod
146  def parseConnectionInfo(self, connstring=''):
147  if not connstring:
148  connstring = os.environ.get('TASKDB', 'sqlite_file:taskdata.db')
149 
150  try:
151  dbtype, dbname = connstring.split(':', 1)
152  except Exception:
153  raise ValueError ('Illegal database connection string {}'.format(connstring))
154 
155  if dbtype == 'auth_file':
156  # dbname is a file with the actual connection information
157  authfile = dbname
158  try:
159  with open(authfile) as af:
160  connstring = af.read().strip()
161  dbtype, dbname = connstring.split(':', 1)
162  except Exception:
163  raise ValueError ('Invalid authorization file {} (not readable or invalid format)'.format(authfile))
164 
165  return dbtype, dbname
166 
167 
168  def __init__(self, connstring='', createDatabase=False):
169  """Constructor. connstring is a connection string specifying either a SQLite file
170  ("sqlite_file:..."), an Oracle database ("oracle://..."), or an authorization file
171  ("auth_file:...") with connection information. If connstring is empty, the connection
172  information will be taken from TASKDB (if set), or otherwise defaults to
173  'sqlite_file:taskdata.db'."""
174 
175  self.debug = False
176  self.paramstyle = None
177  self.is_managing_context = False
178 
179  self.dbtype, self.dbname = self.__class__.parseConnectionInfo(connstring)
180 
181  if self.dbtype == 'sqlite_file':
182  import sqlite3
183  self.paramstyle = 'qmark'
184  dbexists = os.access(self.dbname, os.F_OK)
185  if dbexists and createDatabase:
186  raise ValueError ('SQLite file {} exists already - remove before recreating'.format(self.dbname))
187  if not (dbexists or createDatabase):
188  raise ValueError ('TaskManager database not found (SQLite file {})'.format(self.dbname))
189  self.dbcon = sqlite3.connect(self.dbname)
190  if createDatabase:
191  self._createSQLiteSchema()
192  elif self.dbtype == 'oracle':
193  import cx_Oracle
194  self.paramstyle = 'named'
195  try:
196  self.dbcon = cx_Oracle.connect(self.dbname)
197  except Exception:
198  print ('ERROR: First connection attempt to Beam Spot Oracle database failed; will retry in 10s ...')
199  time.sleep(10)
200  self.dbcon = cx_Oracle.connect(self.dbname)
201  if createDatabase:
202  self._createOracleSchema()
203  else:
204  raise ValueError ('Unknown database type: {}'.format(self.dbtype))
205 
206  def __enter__(self):
207  ''' Remember that we're inside a 'with' statement so we can warn otherwise: '''
208  self.is_managing_context = True
209  return self
210 
211  def __exit__(self, exc_type, exc_value, traceback):
212  ''' Close the database connection at the end of the 'with' statement. '''
213  try:
214  self.dbcon.close()
215  except Exception:
216  print ('ERROR: Unable to close database connection')
217 
219  """Create the database schema for a SQLite3 database."""
220  self.dbcon.cursor().executescript("""
221  create table tasks (
222  TASKID integer primary key autoincrement,
223  DSNAME text,
224  RUNNR integer,
225  TASKNAME text,
226  TEMPLATE text,
227  TASKPOSTPROCSTEPS text,
228  ATLREL text,
229  CREATED real,
230  CREATED_USER text,
231  CREATED_HOST text,
232  UPDATED real,
233  STATUS integer,
234  ONDISK integer,
235  LOGFILES text,
236  COOLTAGS text,
237  NRESULTFILES integer default 0,
238  NJOBS integer default 0,
239  NJOBS_SUBMITTED integer default -1,
240  NJOBS_RUNNING integer default -1,
241  NJOBS_POSTPROC integer default -1,
242  NJOBS_FAILED integer default -1,
243  NJOBS_COMPLETED integer default -1,
244  RESULTFILES text,
245  RESULTLINKS text,
246  TASKCOMMENT text,
247  STATUS_POSTPROC integer,
248  BASEPATH text,
249  AUXDATA text
250  );
251  """)
252  self.dbcon.commit()
253 
254 
256  """Create the database schema for an Oracle database."""
257  try:
258  self.dbcon.cursor().execute('drop sequence TASKS_SEQ')
259  self.dbcon.cursor().execute('drop trigger TASKS_ID_TRIGGER')
260  self.dbcon.cursor().execute('drop table TASKS')
261  except Exception as e:
262  print (e)
263  self.dbcon.cursor().execute("""
264  create table tasks (
265  TASKID number(10,0),
266  DSNAME varchar2(256),
267  RUNNR number(10,0),
268  TASKNAME varchar2(256),
269  TEMPLATE varchar2(256),
270  TASKPOSTPROCSTEPS varchar2(256),
271  ATLREL varchar2(256),
272  CREATED number,
273  CREATED_USER varchar2(256),
274  CREATED_HOST varchar2(256),
275  UPDATED number,
276  STATUS number(5,0),
277  ONDISK number(5,0),
278  LOGFILES varchar2(256),
279  COOLTAGS varchar2(256),
280  NRESULTFILES number(10,0) default 0,
281  NJOBS number(10,0) default 0,
282  NJOBS_SUBMITTED number(10,0) default -1,
283  NJOBS_RUNNING number(10,0) default -1,
284  NJOBS_POSTPROC number(10,0) default -1,
285  NJOBS_FAILED number(10,0) default -1,
286  NJOBS_COMPLETED number(10,0) default -1,
287  RESULTFILES varchar2(4000),
288  RESULTLINKS varchar2(4000),
289  TASKCOMMENT varchar2(4000),
290  STATUS_POSTPROC number(5,0) default -1,
291  BASEPATH varchar2(256),
292  AUXDATA varchar2(4000),
293  constraint BEAMSPOT_TASKS_PK primary key (TASKID)
294  )
295  """)
296  self.dbcon.cursor().execute('create index TASKS_DSNAME_INDX on TASKS(DSNAME)')
297  self.dbcon.cursor().execute('create index TASKS_RUNNR_INDX on TASKS(RUNNR)')
298  self.dbcon.cursor().execute('alter table TASKS add constraint TASKS_TNAME_DSNAME_UQ unique (TASKNAME, DSNAME)')
299  self.dbcon.cursor().execute('create sequence TASKS_SEQ start with 1 increment by 1 nomaxvalue')
300  # The following statement must not have whitespace at the beginning of the line, otherwise
301  # the trigger function won't be parsed correctly:
302  self.dbcon.cursor().execute("""
303 create or replace trigger TASKS_ID_TRIGGER
304 before insert on TASKS
305 for each row
306 begin
307  select TASKS_SEQ.nextval into :new.TASKID from dual;
308 end;
309 """)
310  self.dbcon.commit()
311 
312 
313  def execute(self,statementParts, doCommit=False, limit=None):
314  """Execute a SQL statement passed as a list or tuple of statement parts, where each
315  part is either a partial SQL string, or an object of type DbParam specifying a parameter
316  for the SQL statement. The actual SQL statement is assembled (using the parameter style
317  of the current database) and executed, and the resulting cursor is returned.
318  Loosely follows the method discussed in the Python Cookbook.
319  WARNING: At present, limit doesn't work when ordering rows for Oracle!"""
320  if not self.is_managing_context:
321  print ('**WARN** TaskManager will keep the database connection open until it is deleted.')
322  print ('**INFO** TaskManager should generally only be used inside a with statement:')
323  print ('**INFO** with TaskManager(...) as taskman:')
324  print ('**INFO** # do something ...')
325 
326  if not statementParts:
327  return None
328  if isinstance(statementParts,str):
329  raise TypeError ('Must pass list or tuple to TaskManager.execute')
330  for p in statementParts:
331  if not (isinstance(p,DbParam) or isinstance(p,six.string_types)):
332  raise ValueError ('Can only pass SQL string fragments and DbParam objects in list'
333  'to TaskManager.execute, found %s with "%s"' % (type(p),str(p)))
334  sqlParts = None
335  params = None
336 
337  if self.paramstyle=='qmark':
338  sqlParts = []
339  params = []
340  for p in statementParts:
341  if isinstance(p,DbParam):
342  sqlParts.append('?')
343  params.append(p.value)
344  else:
345  sqlParts.append(p)
346 
347  if self.paramstyle=='named':
348  sqlParts = []
349  params = {}
350  for p in statementParts:
351  if isinstance(p,DbParam):
352  name = 'p%d' % len(params)
353  sqlParts.append(':%s' % name)
354  params[name] = p.value
355  else:
356  sqlParts.append(p)
357 
358  if sqlParts is None:
359  raise ValueError ('Unknown SQL parameter style %s' % self.paramstyle)
360  return None
361  sql = ' '.join(sqlParts)
362 
363  # Handle database-dependent limit clause (PRELIMINARY)
364  if limit:
365  if self.dbtype=='oracle':
366  # WARNING: the following doesn't work always when ordering rows!
367  if 'order by' in sql: # WARNING: works only if order by is in lowercase!
368  sqlParts2 = sql.split('order by')
369  sql = '%s and ROWNUM <= %i order by %s' % (sqlParts2[0],limit,sqlParts2[1])
370  else:
371  sql += ' and ROWNUM <= %i' % limit
372  else:
373  sql += ' limit %i' % limit
374 
375  if self.debug:
376  print ('\nExecuting SQL statement: ',sql)
377  print (' with parameters: ',params)
378 
379  cursor = self.dbcon.cursor()
380  try:
381  cursor.execute(sql,params)
382  if doCommit:
383  self.dbcon.commit()
384  except Exception as e:
385  msg = '\nDatabase error executing SQL statement\n %s\nusing parameters\n %s\n%s' % (sql,params,e)
386  raise TaskManagerDatabaseError (msg)
387  return cursor
388 
389 
390  def deleteTask(self,dsName,taskName):
391  """Delete a task entry from the taskmanager database."""
392  q = [ 'delete from TASKS where DSNAME =',DbParam(dsName),'and TASKNAME =',DbParam(taskName) ]
393  cursor = self.execute(q,True)
394  return cursor.rowcount
395 
396 
397  def addTask(self,dsName,taskName,template,release,njobs,
398  taskpostprocsteps='',
399  status=StatusCodes['SUBMITTED'],
400  onDisk=OnDiskCodes['ALLONDISK'],
401  createdTime=None,
402  createdUser=None,
403  createdHost=None,
404  comment=''):
405  """Add an entry for a new task if the task doesn't exist already. If the task exists,
406  its UPDATED, NJOBS, STATUS and ONDISK fields will be updated."""
407  tstamp = time.time()
408  nDefined = self.getNTasks(['where DSNAME =',DbParam(dsName),'and TASKNAME=',DbParam(taskName)])
409  updateStatus = self.getNTasks(['where DSNAME = ',DbParam(dsName),'and TASKNAME=',DbParam(taskName),'and STATUS < %i' % TaskManager.StatusCodes['POSTPROCRUNNING']])
410 
411  if nDefined:
412  # Task entry exists already; only update UPDATED, NJOBS, STATUS, ONDISK and possibly ATLREL
413  task = self.getTaskDict(dsName,taskName)
414  if task['TEMPLATE']!=template:
415  print ('ERROR: Must not update task with different template: DSNAME = %s, TASKNAME = %s, templates = %s vs %s' % (dsName,taskName,task['TEMPLATE'],template))
416  else:
417  updateStr = ['update TASKS set UPDATED =',DbParam(tstamp),
418  ', NJOBS = ',DbParam(task['NJOBS']+njobs),
419  ', ONDISK = ',DbParam(onDisk)]
420 
421  if release not in task['ATLREL']:
422  print ('WARNING: Updating task using different release: DSNAME = %s, TASKNAME = %s, release = = %s vs %s' % (dsName,taskName,task['ATLREL'],release))
423  release = '; '.join([task['ATLREL'],release])
424  updateStr += [', ATLREL = ',DbParam(release)]
425 
426  if updateStatus:
427  updateStr += [', STATUS = ',DbParam(status)]
428 
429  updateStr += ['where DSNAME = ',DbParam(dsName),'and TASKNAME = ',DbParam(taskName)]
430 
431  self.execute( updateStr)
432 
433  else:
434  # New task entry
435  if not createdTime:
436  createdTime = tstamp
437  if not createdUser:
438  createdUser = getUserName()
439  if not createdHost:
440  createdHost = os.uname()[1]
441  runnr = getRunFromName(dsName,None,True)
442  self.execute( ['insert into TASKS (DSNAME,RUNNR,TASKNAME,TEMPLATE,TASKPOSTPROCSTEPS,ATLREL,CREATED,CREATED_USER,CREATED_HOST,UPDATED,STATUS,ONDISK,NJOBS,TASKCOMMENT) values (',
443  DbParam(dsName),',',
444  DbParam(runnr),',',
445  DbParam(taskName),',',
446  DbParam(template),',',
447  DbParam(taskpostprocsteps),',',
448  DbParam(release),',',
449  DbParam(createdTime),',',
450  DbParam(createdUser),',',
451  DbParam(createdHost),',',
452  DbParam(tstamp),',',
453  DbParam(status),',',
454  DbParam(onDisk),',',
455  DbParam(njobs),',',
456  DbParam(comment),')'],
457  True)
458 
459 
460  def getStatus(self,dsName,taskName):
461  """Get status of a task."""
462  cursor = self.execute( ['select STATUS from TASKS where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName) ] )
463  return cursor.fetchone()[0]
464 
465 
466  def setStatus(self,dsName,taskName,status,oldStatus=None):
467  """Set the status of a task. If oldStatus is set, the task must be in the designated status."""
468  tstamp = time.time()
469  if oldStatus:
470  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', STATUS =',DbParam(status),
471  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName),'and STATUS =',DbParam(oldStatus)], True)
472  else:
473  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', STATUS =',DbParam(status),
474  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
475  return cursor.rowcount
476 
477 
478  def setDiskStatus(self,dsName,taskName,onDisk):
479  """Update the onDisk-status of a task."""
480  tstamp = time.time()
481  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', ONDISK =',DbParam(onDisk),
482  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
483  return cursor.rowcount
484 
485 
486  def updateStatus(self,dsName,taskName,
487  status,nResultFiles,nJobs,nJobsSubmitted,nJobsRunning,nJobsPostProc,nJobsFailed,nJobsCompleted):
488  """Update task status information including number of jobs in different states."""
489  tstamp = time.time()
490  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),
491  ', STATUS =',DbParam(status),
492  ', NRESULTFILES =',DbParam(nResultFiles),
493  ', NJOBS =',DbParam(nJobs),
494  ', NJOBS_SUBMITTED =',DbParam(nJobsSubmitted),
495  ', NJOBS_RUNNING =',DbParam(nJobsRunning),
496  ', NJOBS_POSTPROC =',DbParam(nJobsPostProc),
497  ', NJOBS_FAILED =',DbParam(nJobsFailed),
498  ', NJOBS_COMPLETED =',DbParam(nJobsCompleted),
499  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
500  return cursor.rowcount
501 
502 
503  def setValue(self,dsName,taskName,fieldName,value):
504  """Update a field of a given task."""
505  tstamp = time.time()
506  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', %s =' % fieldName,DbParam(value),
507  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
508  return cursor.rowcount
509 
510 
511  def getValue(self, what, qual=()):
512  """Get a single value from the task database. If the query results in more than one value, only
513  the first value is returned."""
514  q = [ 'select %s from TASKS' % what ]
515  q.extend(qual)
516  return self.execute(q).fetchone()[0]
517 
518 
519  def getCount(self, what, qual=()):
520  # Except when using count(distinct(...)) which is not supported by earlier sqlite versions,
521  # better use getNTasks
522  q = [ 'select %s from TASKS' % what ]
523  q.extend(qual)
524  return len(self.execute(q).fetchall())
525 
526 
527  def getNTasks(self, qual=()):
528  q = [ 'select count(*) from TASKS' ]
529  q.extend(qual)
530  return self.execute(q).fetchone()[0]
531 
532  def getTaskValue(self, dsName,taskName, what):
533  """Get a single value from the task database. If the query results in more than one value, only
534  the first value is returned."""
535  q = [ 'select %s from TASKS where DSNAME =' % what , DbParam(dsName), 'and TASKNAME =', DbParam(taskName)]
536  return self.execute(q).fetchone()[0]
537 
538  def taskIter(self, what='*', qual=('order by UPDATED',)):
539  q = [ 'select %s from TASKS' % what ]
540  q.extend(qual)
541  cursor = self.execute(q)
542  return iter(cursor.fetchone,None)
543 
544 
545  def taskIterDict(self, what='*', qual=('order by UPDATED',), limit=999999999):
546  # NOTE: We could implement this more elegantly using row_factory (sqlite3)
547  # or rowfactory (Oracle), but this would come at the expense of a
548  # direct database dependence. Also, the primitive limit on the
549  # number of items returned by the generator avoids database-
550  # dependent SQL limit statements (until limit in TaskManager.execute
551  # is fully implemented also for Oracle.
552  q = [ 'select %s from TASKS' % what ]
553  q.extend(qual)
554  cursor = self.execute(q)
555  n = 0
556  while n<limit:
557  row = cursor.fetchone()
558  if row is None: break
559  n += 1
560  yield dictFactory(cursor,row)
561 
562 
563  def getTaskDict(self, dsname, taskname, what='*', qual=()):
564  # NOTE: We could implement this more elegantly using row_factory (sqlite3)
565  # or rowfactory (Oracle), but this would come at the expense of a
566  # direct database dependence.
567  if qual:
568  q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME =', DbParam(taskname) ]
569  q.extend(qual)
570  else:
571  if self.dbtype=='oracle':
572  # WARNING: DOES THIS ALWAYS WORK? There anyway should only be a single task here?
573  q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME = ', DbParam(taskname), 'and ROWNUM <= 1 order by UPDATED' ]
574  else:
575  q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME = ', DbParam(taskname), 'order by UPDATED limit 1' ]
576  cursor = self.execute(q)
577  row = cursor.fetchone()
578  if row is None:
579  return {}
580  else:
581  return dictFactory(cursor,row)
582 
583 
584  def getDSNames(self,dsname):
585  """Get all DSNAMEs for a given (partial) data set name dsname."""
586  #q = [ "select distinct(DSNAME) from TASKS where DSNAME like '%%%s%%' order by UPDATED" % dsname ]
587  q = [ "select distinct(DSNAME) from TASKS where DSNAME like '%%%s%%' order by DSNAME" % dsname ]
588  return self.execute(q).fetchall()
589 
590 
591  def getTaskNames(self,dsname,taskname=None,addWildCards=True):
592  """Find all matching tasks and return list of full (DSNAME,TASKNAME) pairs. dsname and taskname
593  may be any partial dataset and task name, respectively."""
594  if addWildCards:
595  if taskname:
596  q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' and TASKNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname,taskname) ]
597  else:
598  q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname) ]
599  else:
600  if taskname:
601  q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%s' and TASKNAME like '%s' order by DSNAME,TASKNAME" % (dsname,taskname) ]
602  else:
603  q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%s' order by DSNAME,TASKNAME" % (dsname) ]
604  return self.execute(q).fetchall()
605 
606 
608  """TaskAnalyzer is a class for analyzing the files of a given task."""
609 
610  def __init__(self,jobDir,dsName,taskName):
611  self.jobDir = jobDir
612  self.dsName = dsName
613  self.taskName = taskName
614  self.path = jobDir+'/'+dsName+'/'+taskName
615 
616  self.status = TaskManager.StatusCodes['UNKNOWN']
617  self.onDisk = TaskManager.OnDiskCodes['UNKNOWN']
618  self.nResultFiles = 0
619  self.nJobs = 0
620  self.nJobsSubmitted = 0
621  self.nJobsRunning = 0
622  self.nJobsPostProc = 0
623  self.nJobsFailed = 0
624  self.nJobsCompleted = 0
625 
626 
627  def isOnDisk(self):
628  """Check if task directories are still on disk."""
629  onDisk = os.path.exists(self.path)
630  if not onDisk:
631  self.onDisk = TaskManager.OnDiskCodes['DELETED']
632  return onDisk
633 
634 
635  def analyzeFiles(self):
636  onDisk = self.isOnDisk()
637  if onDisk:
638  self.nJobs = len(glob.glob(self.path+'/*/*.config.py'))
639  self.nResultFiles = len(glob.glob(self.path+'/*')) - self.nJobs
640  self.nJobsSubmitted = len(glob.glob(self.path+'/*/*.SUBMITTED'))
641  self.nJobsRunning = len(glob.glob(self.path+'/*/*.RUNNING'))
642  self.nJobsPostProc = len(glob.glob(self.path+'/*/*.POSTPROCESSING'))
643  self.nJobsCompleted = len(glob.glob(self.path+'/*/*.COMPLETED'))
644  self.nJobsFailed = self.nJobsCompleted - len(glob.glob(self.path+'/*/*.exit.0'))
645 
646  self.status = TaskManager.StatusCodes['UNKNOWN']
647  if self.nJobs>0:
648  self.status = TaskManager.StatusCodes['CONFIGURED']
649  if self.nJobs==self.nJobsCompleted:
650  self.status = TaskManager.StatusCodes['COMPLETED']
651 
652  if self.nJobsSubmitted>0:
653  self.status = TaskManager.StatusCodes['SUBMITTED']
654  if self.nJobsRunning>0:
655  self.status = TaskManager.StatusCodes['RUNNING']
656  if self.nJobsPostProc>0:
657  self.status = TaskManager.StatusCodes['POSTPROCESSING']
658  if self.nJobsFailed>0:
659  if self.nJobsSubmitted>0 or self.nJobsRunning>0 or self.nJobsPostProc>0:
660  self.status = TaskManager.StatusCodes['PARTIALFAILED']
661  else:
662  self.status = TaskManager.StatusCodes['FAILED']
663 
664  return onDisk
665 
666 
667  def updateStatus(self,taskman):
668  taskman.updateStatus(self.dsName,self.taskName,
669  self.status,self.nResultFiles,
671 
672 
673 
675  """JobAnalyzer is a class for analyzing the jobs of a given task."""
676 
677  def __init__(self,jobDir,dsName,taskName):
678  self.jobDir = jobDir
679  self.dsName = dsName
680  self.taskName = taskName
681  self.path = jobDir+'/'+dsName+'/'+taskName
682  #self.maxWallTime = 5*86400 # max time job should be running in s
683 
684 
685  def jobList(self):
686  try:
687  l = os.listdir(self.path)
688  except Exception:
689  l = []
690  return l
691 
692 
693  def jobStatus(self,jobName):
694  status = TaskManager.StatusCodes['UNKNOWN']
695  exitcode = ''
696  p = self.path+'/'+jobName
697  if os.path.exists(p):
698  if glob.glob(p+'/*.config.py'):
699  status = TaskManager.StatusCodes['CONFIGURED']
700  if glob.glob(p+'/*.SUBMITTED'):
701  status = TaskManager.StatusCodes['SUBMITTED']
702  # Protect against jobs dying w/o updating time stamp files - must be
703  # accompanied by similar protection in TaskAnalyzer
704  #r = glob.glob(p+'/*.RUNNING'):
705  #if r:
706  # status = TaskManager.StatusCodes['RUNNING']
707  # if len(r)==1 && (time.time()-os.path.getctime(r[0]))>self.maxWallTime:
708  # status = TaskManager.StatusCodes['FAILED']
709  if glob.glob(p+'/*.RUNNING'):
710  status = TaskManager.StatusCodes['RUNNING']
711  if glob.glob(p+'/*.POSTPROCESSING'):
712  status = TaskManager.StatusCodes['POSTPROCESSING']
713  if glob.glob(p+'/*.COMPLETED'):
714  status = TaskManager.StatusCodes['COMPLETED']
715  # Check if job completed w/o producing an exit status
716  if len(glob.glob(p+'/*.exit.0'))==0:
717  status = TaskManager.StatusCodes['FAILED']
718  try:
719  exitcode = open(glob.glob(p+'/*.exitstatus.dat')[0]).read()
720  except Exception:
721  pass
722  if len(glob.glob(p+'/*.exit.*'))>len(glob.glob(p+'/*.exit.0')):
723  status = TaskManager.StatusCodes['FAILED']
724  return (status,exitcode)
725 
726 
727 #
728 # Test code for modules
729 #
730 if __name__ == '__main__':
731  #t = TaskManager('sqlite_file:test.db', True)
732  #t = TaskManager('sqlite_file:test.db')
733  #t = TaskManager('oracle:USER/PASSWORD@HOST')
734  #t.addTask('foo','bar','VertexTemplate.py','15.4.0',55)
735  #t.addTask('foo','bbb','VertexTemplate.py','15.4.0',55)
736  pass
read
IovVectorMap_t read(const Folder &theFolder, const SelectionCriterion &choice, const unsigned int limit=10)
Definition: openCoraCool.cxx:569
python.TaskManager.TaskManager.execute
def execute(self, statementParts, doCommit=False, limit=None)
Definition: TaskManager.py:313
python.TaskManager.TaskManager.addTask
def addTask(self, dsName, taskName, template, release, njobs, taskpostprocsteps='', status=StatusCodes['SUBMITTED'], onDisk=OnDiskCodes['ALLONDISK'], createdTime=None, createdUser=None, createdHost=None, comment='')
Definition: TaskManager.py:397
python.TaskManager.TaskManager.__enter__
def __enter__(self)
Definition: TaskManager.py:206
python.TaskManager.JobAnalyzer.path
path
Definition: TaskManager.py:681
python.TaskManager.TaskManager.getCount
def getCount(self, what, qual=())
Definition: TaskManager.py:519
python.TaskManager.TaskAnalyzer.nJobs
nJobs
Definition: TaskManager.py:619
vtune_athena.format
format
Definition: vtune_athena.py:14
python.TaskManager.TaskManagerDatabaseError
Definition: TaskManager.py:31
python.TaskManager.JobAnalyzer.jobList
def jobList(self)
Definition: TaskManager.py:685
python.TaskManager.TaskAnalyzer.isOnDisk
def isOnDisk(self)
Definition: TaskManager.py:627
python.TaskManager.getJobConfig
def getJobConfig(jobDir, dsName, taskName, jobName=' *')
Definition: TaskManager.py:103
python.TaskManager.appendUnique
def appendUnique(s, v)
Definition: TaskManager.py:66
python.TaskManager.TaskManager.getStatus
def getStatus(self, dsName, taskName)
Definition: TaskManager.py:460
python.TaskManager.TaskAnalyzer.nResultFiles
nResultFiles
Definition: TaskManager.py:618
python.TaskManager.DbParam.__repr__
def __repr__(self)
Definition: TaskManager.py:118
python.TaskManager.TaskAnalyzer.dsName
dsName
Definition: TaskManager.py:612
python.TaskManager.TaskAnalyzer.taskName
taskName
Definition: TaskManager.py:613
python.Utils.getRunFromName
def getRunFromName(name, default='', asInt=False)
Definition: InnerDetector/InDetExample/InDetBeamSpotExample/python/Utils.py:13
python.TaskManager.TaskManager.getDSNames
def getDSNames(self, dsname)
Definition: TaskManager.py:584
python.TaskManager.TaskAnalyzer.nJobsCompleted
nJobsCompleted
Definition: TaskManager.py:624
python.TaskManager.DbParam
Definition: TaskManager.py:114
python.TaskManager.TaskManagerCheckError
Definition: TaskManager.py:28
python.TaskManager.TaskManager.dbtype
dbtype
Definition: TaskManager.py:181
python.TaskManager.TaskAnalyzer.status
status
Definition: TaskManager.py:616
python.TaskManager.TaskManager.paramstyle
paramstyle
Definition: TaskManager.py:176
python.TaskManager.JobAnalyzer.taskName
taskName
Definition: TaskManager.py:680
python.TaskManager.TaskManager.getTaskValue
def getTaskValue(self, dsName, taskName, what)
Definition: TaskManager.py:532
python.TaskManager.TaskManager.__exit__
def __exit__(self, exc_type, exc_value, traceback)
Definition: TaskManager.py:211
python.TaskManager.TaskManager.__init__
def __init__(self, connstring='', createDatabase=False)
Definition: TaskManager.py:168
python.TaskManager.TaskManager.getValue
def getValue(self, what, qual=())
Definition: TaskManager.py:511
python.TaskManager.DbParam.value
value
Definition: TaskManager.py:117
python.TaskManager.TaskManager
Definition: TaskManager.py:122
python.TaskManager.TaskManager.parseConnectionInfo
def parseConnectionInfo(self, connstring='')
Definition: TaskManager.py:146
python.TaskManager.TaskManager.deleteTask
def deleteTask(self, dsName, taskName)
Definition: TaskManager.py:390
python.TaskManager.DbParam.__init__
def __init__(self, value)
Definition: TaskManager.py:116
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
python.TaskManager.TaskManager.updateStatus
def updateStatus(self, dsName, taskName, status, nResultFiles, nJobs, nJobsSubmitted, nJobsRunning, nJobsPostProc, nJobsFailed, nJobsCompleted)
Definition: TaskManager.py:486
python.TaskManager.JobAnalyzer.jobStatus
def jobStatus(self, jobName)
Definition: TaskManager.py:693
PlotPulseshapeFromCool.input
input
Definition: PlotPulseshapeFromCool.py:106
python.TaskManager.JobAnalyzer.__init__
def __init__(self, jobDir, dsName, taskName)
Definition: TaskManager.py:677
python.TaskManager.getFullTaskNames
def getFullTaskNames(taskman, dsname, taskname, requireSingleTask=False, confirmWithUser=False, addWildCards=True)
Definition: TaskManager.py:74
python.TaskManager.JobAnalyzer.dsName
dsName
Definition: TaskManager.py:679
python.TaskManager.TaskAnalyzer.updateStatus
def updateStatus(self, taskman)
Definition: TaskManager.py:667
python.TaskManager.TaskAnalyzer.path
path
Definition: TaskManager.py:614
python.TaskManager.JobAnalyzer.jobDir
jobDir
Definition: TaskManager.py:678
python.TaskManager.TaskManager.getTaskDict
def getTaskDict(self, dsname, taskname, what=' *', qual=())
Definition: TaskManager.py:563
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.TaskManager.TaskManager.debug
debug
Definition: TaskManager.py:175
python.TaskManager.getKey
def getKey(d, v)
Definition: TaskManager.py:43
Trk::open
@ open
Definition: BinningType.h:40
python.TaskManager.TaskAnalyzer.__init__
def __init__(self, jobDir, dsName, taskName)
Definition: TaskManager.py:610
python.TaskManager.TaskAnalyzer.nJobsFailed
nJobsFailed
Definition: TaskManager.py:623
python.TaskManager.TaskManager.taskIter
def taskIter(self, what=' *', qual=('order by UPDATED',))
Definition: TaskManager.py:538
query_example.cursor
cursor
Definition: query_example.py:21
python.TaskManager.TaskManager.setDiskStatus
def setDiskStatus(self, dsName, taskName, onDisk)
Definition: TaskManager.py:478
python.TaskManager.TaskManager.is_managing_context
is_managing_context
Definition: TaskManager.py:177
python.TaskManager.TaskManager.dbname
dbname
Definition: TaskManager.py:179
python.TaskManager.TaskManager.getTaskNames
def getTaskNames(self, dsname, taskname=None, addWildCards=True)
Definition: TaskManager.py:591
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
python.TaskManager.TaskAnalyzer
Definition: TaskManager.py:607
python.TaskManager.TaskAnalyzer.nJobsRunning
nJobsRunning
Definition: TaskManager.py:621
python.TaskManager.TaskManager.dbcon
dbcon
Definition: TaskManager.py:189
python.TaskManager.getStatusClass
def getStatusClass(status)
Definition: TaskManager.py:53
python.TaskManager.TaskManager._createOracleSchema
def _createOracleSchema(self)
Definition: TaskManager.py:255
python.TaskManager.TaskAnalyzer.onDisk
onDisk
Definition: TaskManager.py:617
pickleTool.object
object
Definition: pickleTool.py:30
str
Definition: BTagTrackIpAccessor.cxx:11
dbg::print
void print(std::FILE *stream, std::format_string< Args... > fmt, Args &&... args)
Definition: SGImplSvc.cxx:70
python.TaskManager.TaskAnalyzer.nJobsSubmitted
nJobsSubmitted
Definition: TaskManager.py:620
python.TaskManager.TaskManager.setStatus
def setStatus(self, dsName, taskName, status, oldStatus=None)
Definition: TaskManager.py:466
calibdata.commit
bool commit
Definition: calibdata.py:832
python.TaskManager.dictFactory
def dictFactory(cursor, row)
Definition: TaskManager.py:35
python.TaskManager.TaskAnalyzer.nJobsPostProc
nJobsPostProc
Definition: TaskManager.py:622
python.TaskManager.TaskManager._createSQLiteSchema
def _createSQLiteSchema(self)
Definition: TaskManager.py:218
python.TaskManager.TaskManager.getNTasks
def getNTasks(self, qual=())
Definition: TaskManager.py:527
python.TaskManager.TaskAnalyzer.jobDir
jobDir
Definition: TaskManager.py:611
python.Utils.getUserName
def getUserName(default='UNKNOWN')
Definition: InnerDetector/InDetExample/InDetBeamSpotExample/python/Utils.py:48
python.TaskManager.TaskAnalyzer.analyzeFiles
def analyzeFiles(self)
Definition: TaskManager.py:635
python.TaskManager.JobAnalyzer
Definition: TaskManager.py:674
python.TaskManager.TaskManager.setValue
def setValue(self, dsName, taskName, fieldName, value)
Definition: TaskManager.py:503
python.TaskManager.TaskManager.taskIterDict
def taskIterDict(self, what=' *', qual=('order by UPDATED',), limit=999999999)
Definition: TaskManager.py:545