ATLAS Offline Software
TaskManager.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
4 
5 """
6 TaskManager is a tool for keeping track of JobRunner jobs using a
7 database. TaskManager uses the notion of a task as the primary unit for
8 bookkeeping. Each task includes one or more jobs run using the same
9 JobRunner template and typically resulting from a single invocation of
10 JobRunner through one of the makeDPD.py or runDPD.py
11 scripts. TaskManager keeps track of the status of tasks and associated
12 jobs, runs postprocessing scripts, and accumulates results for later
13 analysis accross tasks.
14 
15 Written by Juerg Beringer (LBNL) in 2009.
16 """
17 __author__ = 'Juerg Beringer'
18 __version__ = 'TaskManager.py atlas/athena'
19 
20 import time, os, glob
21 
22 from InDetBeamSpotExample.Utils import getRunFromName
23 from InDetBeamSpotExample.Utils import getUserName
24 
25 # Exception classes
26 class TaskManagerCheckError(Exception):
27  """Task manager exception class."""
28 
29 class TaskManagerDatabaseError(Exception):
30  """Task manager exception because of a database error"""
31 
32 
33 def dictFactory(cursor, row):
34  """Convert a tuple from a database query into a dictonary."""
35  d = {}
36  for idx, col in enumerate(cursor.description):
37  d[col[0]] = row[idx]
38  return d
39 
40 
41 def getKey(d, v):
42  """Get the key for which dictonary d has an entry with value v.
43  Returns 'Undefined' if there's no such key, if several values are found."""
44  l = [k for k in d.keys() if d[k]==v]
45  if len(l)==1:
46  return l[0]
47  else:
48  return 'Undefined'
49 
50 
51 def getStatusClass(status):
52  """Returns 'ok', 'warn' or 'bad' depending on the value of status."""
53  statusMap = {
54  7: 'bad',
55  10: 'ok',
56  11: 'bad'
57  }
58  if status in statusMap:
59  return statusMap[status]
60  else:
61  return 'warn'
62 
63 
64 def appendUnique(s,v):
65  if not s:
66  s = '' # make sure it is not None
67  if v not in s.split():
68  s = ' '.join([s,v])
69  return s
70 
71 
72 def getFullTaskNames(taskman,dsname,taskname,requireSingleTask=False,confirmWithUser=False,addWildCards=True):
73  """Retrieve the full dataset and task names given a pair of (dsname,task) that may
74  contain wildcards or be just a parital name such as the run number. Depending
75  on the requireSingleTask and confirmWithUser settings a TaskManagerCheckError
76  is raised if there are multiple tasks or if the user doesn't confirm."""
77  taskList = taskman.getTaskNames(dsname,taskname,addWildCards)
78  if len(taskList)==0:
79  raise TaskManagerCheckError ('ERROR: No tasks found for dataset = %s, task = %s' % (dsname,taskname))
80  if requireSingleTask and len(taskList)!=1:
81  m = "ERROR: Multiple data set names found for dataset = %s, task = %s\n Please use full dataset or task name from list below, using option -n if necessary:\n\n" % (dsname,taskname)
82  m += " %-50s %s\n" % ('DATASET NAME','TASK NAME')
83  m += " %s\n" % (75*'-')
84  for t in taskList:
85  m += " %-50s %s\n" % (t[0],t[1])
86  m += '\n'
87  raise TaskManagerCheckError (m)
88  if confirmWithUser:
89  print ('Please confirm that you want to execute this command for the following tasks:\n')
90  print (" %-50s %s" % ('DATASET NAME','TASK NAME'))
91  print (" %s" % (75*'-'))
92  for t in taskList:
93  print (" %-50s %s" % (t[0],t[1]))
94  a = input('\nARE YOU SURE [n] ? ')
95  if a!='y':
96  raise TaskManagerCheckError ('ERROR: Aborted by user')
97  print()
98  return taskList
99 
100 
101 def getJobConfig(jobDir,dsName,taskName,jobName='*'):
102  """Read config dict from job files."""
103  config = {}
104  configFile = glob.glob('%s/%s/%s/%s/%s' % (jobDir,dsName,taskName,jobName,'*.config.py.final.py'))
105  if not configFile:
106  configFile = glob.glob('%s/%s/%s/%s/%s' % (jobDir,dsName,taskName,jobName,'*.config.py'))
107  if configFile:
108  exec(open(configFile[0]).read(),config) # Eval config file and put defs into config dict
109  return config['jobConfig']
110 
111 
113  """A utility class to hold a single parameter to be used in SQL queries."""
114  def __init__(self,value):
115  self.value = value
116  def __repr__(self):
117  return '%r' % self.value
118 
119 
121  """TaskManager is tool for keeping track of JobRunner jobs."""
122 
123  StatusCodes = { 'UNKNOWN': -1,
124  'CONFIGURED': 0,
125  'SUBMITTED': 1,
126  'RUNNING': 2,
127  'PARTIALFAILED': 3,
128  # Codes above are considered tasks whose status needs to
129  # be monitored and updated
130  'POSTPROCESSING': 5,
131  'POSTPROCRUNNING': 6,
132  'POSTPROCFAILED': 7,
133  'COMPLETED': 10,
134  'FAILED': 11}
135 
136  OnDiskCodes = { 'UNKNOWN': -1,
137  'ALLONDISK': 0,
138  'RESULTSONDISK': 1,
139  # Codes above are for jobs whose task directory is on disk
140  'ARCHIVED': 10,
141  'DELETED': 11 }
142 
143  @classmethod
144  def parseConnectionInfo(self, connstring=''):
145  if not connstring:
146  connstring = os.environ.get('TASKDB', 'sqlite_file:taskdata.db')
147 
148  try:
149  dbtype, dbname = connstring.split(':', 1)
150  except Exception:
151  raise ValueError ('Illegal database connection string {}'.format(connstring))
152 
153  if dbtype == 'auth_file':
154  # dbname is a file with the actual connection information
155  authfile = dbname
156  try:
157  with open(authfile) as af:
158  connstring = af.read().strip()
159  dbtype, dbname = connstring.split(':', 1)
160  except Exception:
161  raise ValueError ('Invalid authorization file {} (not readable or invalid format)'.format(authfile))
162 
163  return dbtype, dbname
164 
165 
166  def __init__(self, connstring='', createDatabase=False):
167  """Constructor. connstring is a connection string specifying either a SQLite file
168  ("sqlite_file:..."), an Oracle database ("oracle://..."), or an authorization file
169  ("auth_file:...") with connection information. If connstring is empty, the connection
170  information will be taken from TASKDB (if set), or otherwise defaults to
171  'sqlite_file:taskdata.db'."""
172 
173  self.debug = False
174  self.paramstyle = None
175  self.is_managing_context = False
176 
177  self.dbtype, self.dbname = self.__class__.parseConnectionInfo(connstring)
178 
179  if self.dbtype == 'sqlite_file':
180  import sqlite3
181  self.paramstyle = 'qmark'
182  dbexists = os.access(self.dbname, os.F_OK)
183  if dbexists and createDatabase:
184  raise ValueError ('SQLite file {} exists already - remove before recreating'.format(self.dbname))
185  if not (dbexists or createDatabase):
186  raise ValueError ('TaskManager database not found (SQLite file {})'.format(self.dbname))
187  self.dbcon = sqlite3.connect(self.dbname)
188  if createDatabase:
189  self._createSQLiteSchema()
190  elif self.dbtype == 'oracle':
191  import cx_Oracle
192  self.paramstyle = 'named'
193  try:
194  self.dbcon = cx_Oracle.connect(self.dbname)
195  except Exception:
196  print ('ERROR: First connection attempt to Beam Spot Oracle database failed; will retry in 10s ...')
197  time.sleep(10)
198  self.dbcon = cx_Oracle.connect(self.dbname)
199  if createDatabase:
200  self._createOracleSchema()
201  else:
202  raise ValueError ('Unknown database type: {}'.format(self.dbtype))
203 
204  def __enter__(self):
205  ''' Remember that we're inside a 'with' statement so we can warn otherwise: '''
206  self.is_managing_context = True
207  return self
208 
209  def __exit__(self, exc_type, exc_value, traceback):
210  ''' Close the database connection at the end of the 'with' statement. '''
211  try:
212  self.dbcon.close()
213  except Exception:
214  print ('ERROR: Unable to close database connection')
215 
217  """Create the database schema for a SQLite3 database."""
218  self.dbcon.cursor().executescript("""
219  create table tasks (
220  TASKID integer primary key autoincrement,
221  DSNAME text,
222  RUNNR integer,
223  TASKNAME text,
224  TEMPLATE text,
225  TASKPOSTPROCSTEPS text,
226  ATLREL text,
227  CREATED real,
228  CREATED_USER text,
229  CREATED_HOST text,
230  UPDATED real,
231  STATUS integer,
232  ONDISK integer,
233  LOGFILES text,
234  COOLTAGS text,
235  NRESULTFILES integer default 0,
236  NJOBS integer default 0,
237  NJOBS_SUBMITTED integer default -1,
238  NJOBS_RUNNING integer default -1,
239  NJOBS_POSTPROC integer default -1,
240  NJOBS_FAILED integer default -1,
241  NJOBS_COMPLETED integer default -1,
242  RESULTFILES text,
243  RESULTLINKS text,
244  TASKCOMMENT text,
245  STATUS_POSTPROC integer,
246  BASEPATH text,
247  AUXDATA text
248  );
249  """)
250  self.dbcon.commit()
251 
252 
254  """Create the database schema for an Oracle database."""
255  try:
256  self.dbcon.cursor().execute('drop sequence TASKS_SEQ')
257  self.dbcon.cursor().execute('drop trigger TASKS_ID_TRIGGER')
258  self.dbcon.cursor().execute('drop table TASKS')
259  except Exception as e:
260  print (e)
261  self.dbcon.cursor().execute("""
262  create table tasks (
263  TASKID number(10,0),
264  DSNAME varchar2(256),
265  RUNNR number(10,0),
266  TASKNAME varchar2(256),
267  TEMPLATE varchar2(256),
268  TASKPOSTPROCSTEPS varchar2(256),
269  ATLREL varchar2(256),
270  CREATED number,
271  CREATED_USER varchar2(256),
272  CREATED_HOST varchar2(256),
273  UPDATED number,
274  STATUS number(5,0),
275  ONDISK number(5,0),
276  LOGFILES varchar2(256),
277  COOLTAGS varchar2(256),
278  NRESULTFILES number(10,0) default 0,
279  NJOBS number(10,0) default 0,
280  NJOBS_SUBMITTED number(10,0) default -1,
281  NJOBS_RUNNING number(10,0) default -1,
282  NJOBS_POSTPROC number(10,0) default -1,
283  NJOBS_FAILED number(10,0) default -1,
284  NJOBS_COMPLETED number(10,0) default -1,
285  RESULTFILES varchar2(4000),
286  RESULTLINKS varchar2(4000),
287  TASKCOMMENT varchar2(4000),
288  STATUS_POSTPROC number(5,0) default -1,
289  BASEPATH varchar2(256),
290  AUXDATA varchar2(4000),
291  constraint BEAMSPOT_TASKS_PK primary key (TASKID)
292  )
293  """)
294  self.dbcon.cursor().execute('create index TASKS_DSNAME_INDX on TASKS(DSNAME)')
295  self.dbcon.cursor().execute('create index TASKS_RUNNR_INDX on TASKS(RUNNR)')
296  self.dbcon.cursor().execute('alter table TASKS add constraint TASKS_TNAME_DSNAME_UQ unique (TASKNAME, DSNAME)')
297  self.dbcon.cursor().execute('create sequence TASKS_SEQ start with 1 increment by 1 nomaxvalue')
298  # The following statement must not have whitespace at the beginning of the line, otherwise
299  # the trigger function won't be parsed correctly:
300  self.dbcon.cursor().execute("""
301 create or replace trigger TASKS_ID_TRIGGER
302 before insert on TASKS
303 for each row
304 begin
305  select TASKS_SEQ.nextval into :new.TASKID from dual;
306 end;
307 """)
308  self.dbcon.commit()
309 
310 
311  def execute(self,statementParts, doCommit=False, limit=None):
312  """Execute a SQL statement passed as a list or tuple of statement parts, where each
313  part is either a partial SQL string, or an object of type DbParam specifying a parameter
314  for the SQL statement. The actual SQL statement is assembled (using the parameter style
315  of the current database) and executed, and the resulting cursor is returned.
316  Loosely follows the method discussed in the Python Cookbook.
317  WARNING: At present, limit doesn't work when ordering rows for Oracle!"""
318  if not self.is_managing_context:
319  print ('**WARN** TaskManager will keep the database connection open until it is deleted.')
320  print ('**INFO** TaskManager should generally only be used inside a with statement:')
321  print ('**INFO** with TaskManager(...) as taskman:')
322  print ('**INFO** # do something ...')
323 
324  if not statementParts:
325  return None
326  if isinstance(statementParts,str):
327  raise TypeError ('Must pass list or tuple to TaskManager.execute')
328  for p in statementParts:
329  if not (isinstance(p,DbParam) or isinstance(p,str)):
330  raise ValueError ('Can only pass SQL string fragments and DbParam objects in list'
331  'to TaskManager.execute, found %s with "%s"' % (type(p),str(p)))
332  sqlParts = None
333  params = None
334 
335  if self.paramstyle=='qmark':
336  sqlParts = []
337  params = []
338  for p in statementParts:
339  if isinstance(p,DbParam):
340  sqlParts.append('?')
341  params.append(p.value)
342  else:
343  sqlParts.append(p)
344 
345  if self.paramstyle=='named':
346  sqlParts = []
347  params = {}
348  for p in statementParts:
349  if isinstance(p,DbParam):
350  name = 'p%d' % len(params)
351  sqlParts.append(':%s' % name)
352  params[name] = p.value
353  else:
354  sqlParts.append(p)
355 
356  if sqlParts is None:
357  raise ValueError ('Unknown SQL parameter style %s' % self.paramstyle)
358  return None
359  sql = ' '.join(sqlParts)
360 
361  # Handle database-dependent limit clause (PRELIMINARY)
362  if limit:
363  if self.dbtype=='oracle':
364  # WARNING: the following doesn't work always when ordering rows!
365  if 'order by' in sql: # WARNING: works only if order by is in lowercase!
366  sqlParts2 = sql.split('order by')
367  sql = '%s and ROWNUM <= %i order by %s' % (sqlParts2[0],limit,sqlParts2[1])
368  else:
369  sql += ' and ROWNUM <= %i' % limit
370  else:
371  sql += ' limit %i' % limit
372 
373  if self.debug:
374  print ('\nExecuting SQL statement: ',sql)
375  print (' with parameters: ',params)
376 
377  cursor = self.dbcon.cursor()
378  try:
379  cursor.execute(sql,params)
380  if doCommit:
381  self.dbcon.commit()
382  except Exception as e:
383  msg = '\nDatabase error executing SQL statement\n %s\nusing parameters\n %s\n%s' % (sql,params,e)
384  raise TaskManagerDatabaseError (msg)
385  return cursor
386 
387 
388  def deleteTask(self,dsName,taskName):
389  """Delete a task entry from the taskmanager database."""
390  q = [ 'delete from TASKS where DSNAME =',DbParam(dsName),'and TASKNAME =',DbParam(taskName) ]
391  cursor = self.execute(q,True)
392  return cursor.rowcount
393 
394 
395  def addTask(self,dsName,taskName,template,release,njobs,
396  taskpostprocsteps='',
397  status=StatusCodes['SUBMITTED'],
398  onDisk=OnDiskCodes['ALLONDISK'],
399  createdTime=None,
400  createdUser=None,
401  createdHost=None,
402  comment=''):
403  """Add an entry for a new task if the task doesn't exist already. If the task exists,
404  its UPDATED, NJOBS, STATUS and ONDISK fields will be updated."""
405  tstamp = time.time()
406  nDefined = self.getNTasks(['where DSNAME =',DbParam(dsName),'and TASKNAME=',DbParam(taskName)])
407  updateStatus = self.getNTasks(['where DSNAME = ',DbParam(dsName),'and TASKNAME=',DbParam(taskName),'and STATUS < %i' % TaskManager.StatusCodes['POSTPROCRUNNING']])
408 
409  if nDefined:
410  # Task entry exists already; only update UPDATED, NJOBS, STATUS, ONDISK and possibly ATLREL
411  task = self.getTaskDict(dsName,taskName)
412  if task['TEMPLATE']!=template:
413  print ('ERROR: Must not update task with different template: DSNAME = %s, TASKNAME = %s, templates = %s vs %s' % (dsName,taskName,task['TEMPLATE'],template))
414  else:
415  updateStr = ['update TASKS set UPDATED =',DbParam(tstamp),
416  ', NJOBS = ',DbParam(task['NJOBS']+njobs),
417  ', ONDISK = ',DbParam(onDisk)]
418 
419  if release not in task['ATLREL']:
420  print ('WARNING: Updating task using different release: DSNAME = %s, TASKNAME = %s, release = = %s vs %s' % (dsName,taskName,task['ATLREL'],release))
421  release = '; '.join([task['ATLREL'],release])
422  updateStr += [', ATLREL = ',DbParam(release)]
423 
424  if updateStatus:
425  updateStr += [', STATUS = ',DbParam(status)]
426 
427  updateStr += ['where DSNAME = ',DbParam(dsName),'and TASKNAME = ',DbParam(taskName)]
428 
429  self.execute( updateStr)
430 
431  else:
432  # New task entry
433  if not createdTime:
434  createdTime = tstamp
435  if not createdUser:
436  createdUser = getUserName()
437  if not createdHost:
438  createdHost = os.uname()[1]
439  runnr = getRunFromName(dsName,None,True)
440  self.execute( ['insert into TASKS (DSNAME,RUNNR,TASKNAME,TEMPLATE,TASKPOSTPROCSTEPS,ATLREL,CREATED,CREATED_USER,CREATED_HOST,UPDATED,STATUS,ONDISK,NJOBS,TASKCOMMENT) values (',
441  DbParam(dsName),',',
442  DbParam(runnr),',',
443  DbParam(taskName),',',
444  DbParam(template),',',
445  DbParam(taskpostprocsteps),',',
446  DbParam(release),',',
447  DbParam(createdTime),',',
448  DbParam(createdUser),',',
449  DbParam(createdHost),',',
450  DbParam(tstamp),',',
451  DbParam(status),',',
452  DbParam(onDisk),',',
453  DbParam(njobs),',',
454  DbParam(comment),')'],
455  True)
456 
457 
458  def getStatus(self,dsName,taskName):
459  """Get status of a task."""
460  cursor = self.execute( ['select STATUS from TASKS where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName) ] )
461  return cursor.fetchone()[0]
462 
463 
464  def setStatus(self,dsName,taskName,status,oldStatus=None):
465  """Set the status of a task. If oldStatus is set, the task must be in the designated status."""
466  tstamp = time.time()
467  if oldStatus:
468  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', STATUS =',DbParam(status),
469  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName),'and STATUS =',DbParam(oldStatus)], True)
470  else:
471  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', STATUS =',DbParam(status),
472  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
473  return cursor.rowcount
474 
475 
476  def setDiskStatus(self,dsName,taskName,onDisk):
477  """Update the onDisk-status of a task."""
478  tstamp = time.time()
479  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', ONDISK =',DbParam(onDisk),
480  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
481  return cursor.rowcount
482 
483 
484  def updateStatus(self,dsName,taskName,
485  status,nResultFiles,nJobs,nJobsSubmitted,nJobsRunning,nJobsPostProc,nJobsFailed,nJobsCompleted):
486  """Update task status information including number of jobs in different states."""
487  tstamp = time.time()
488  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),
489  ', STATUS =',DbParam(status),
490  ', NRESULTFILES =',DbParam(nResultFiles),
491  ', NJOBS =',DbParam(nJobs),
492  ', NJOBS_SUBMITTED =',DbParam(nJobsSubmitted),
493  ', NJOBS_RUNNING =',DbParam(nJobsRunning),
494  ', NJOBS_POSTPROC =',DbParam(nJobsPostProc),
495  ', NJOBS_FAILED =',DbParam(nJobsFailed),
496  ', NJOBS_COMPLETED =',DbParam(nJobsCompleted),
497  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
498  return cursor.rowcount
499 
500 
501  def setValue(self,dsName,taskName,fieldName,value):
502  """Update a field of a given task."""
503  tstamp = time.time()
504  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', %s =' % fieldName,DbParam(value),
505  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
506  return cursor.rowcount
507 
508 
509  def getValue(self, what, qual=()):
510  """Get a single value from the task database. If the query results in more than one value, only
511  the first value is returned."""
512  q = [ 'select %s from TASKS' % what ]
513  q.extend(qual)
514  return self.execute(q).fetchone()[0]
515 
516 
517  def getCount(self, what, qual=()):
518  # Except when using count(distinct(...)) which is not supported by earlier sqlite versions,
519  # better use getNTasks
520  q = [ 'select %s from TASKS' % what ]
521  q.extend(qual)
522  return len(self.execute(q).fetchall())
523 
524 
525  def getNTasks(self, qual=()):
526  q = [ 'select count(*) from TASKS' ]
527  q.extend(qual)
528  return self.execute(q).fetchone()[0]
529 
530  def getTaskValue(self, dsName,taskName, what):
531  """Get a single value from the task database. If the query results in more than one value, only
532  the first value is returned."""
533  q = [ 'select %s from TASKS where DSNAME =' % what , DbParam(dsName), 'and TASKNAME =', DbParam(taskName)]
534  return self.execute(q).fetchone()[0]
535 
536  def taskIter(self, what='*', qual=('order by UPDATED',)):
537  q = [ 'select %s from TASKS' % what ]
538  q.extend(qual)
539  cursor = self.execute(q)
540  return iter(cursor.fetchone,None)
541 
542 
543  def taskIterDict(self, what='*', qual=('order by UPDATED',), limit=999999999):
544  # NOTE: We could implement this more elegantly using row_factory (sqlite3)
545  # or rowfactory (Oracle), but this would come at the expense of a
546  # direct database dependence. Also, the primitive limit on the
547  # number of items returned by the generator avoids database-
548  # dependent SQL limit statements (until limit in TaskManager.execute
549  # is fully implemented also for Oracle.
550  q = [ 'select %s from TASKS' % what ]
551  q.extend(qual)
552  cursor = self.execute(q)
553  n = 0
554  while n<limit:
555  row = cursor.fetchone()
556  if row is None: break
557  n += 1
558  yield dictFactory(cursor,row)
559 
560 
561  def getTaskDict(self, dsname, taskname, what='*', qual=()):
562  # NOTE: We could implement this more elegantly using row_factory (sqlite3)
563  # or rowfactory (Oracle), but this would come at the expense of a
564  # direct database dependence.
565  if qual:
566  q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME =', DbParam(taskname) ]
567  q.extend(qual)
568  else:
569  if self.dbtype=='oracle':
570  # WARNING: DOES THIS ALWAYS WORK? There anyway should only be a single task here?
571  q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME = ', DbParam(taskname), 'and ROWNUM <= 1 order by UPDATED' ]
572  else:
573  q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME = ', DbParam(taskname), 'order by UPDATED limit 1' ]
574  cursor = self.execute(q)
575  row = cursor.fetchone()
576  if row is None:
577  return {}
578  else:
579  return dictFactory(cursor,row)
580 
581 
582  def getDSNames(self,dsname):
583  """Get all DSNAMEs for a given (partial) data set name dsname."""
584  #q = [ "select distinct(DSNAME) from TASKS where DSNAME like '%%%s%%' order by UPDATED" % dsname ]
585  q = [ "select distinct(DSNAME) from TASKS where DSNAME like '%%%s%%' order by DSNAME" % dsname ]
586  return self.execute(q).fetchall()
587 
588 
589  def getTaskNames(self,dsname,taskname=None,addWildCards=True):
590  """Find all matching tasks and return list of full (DSNAME,TASKNAME) pairs. dsname and taskname
591  may be any partial dataset and task name, respectively."""
592  if addWildCards:
593  if taskname:
594  q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' and TASKNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname,taskname) ]
595  else:
596  q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname) ]
597  else:
598  if taskname:
599  q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%s' and TASKNAME like '%s' order by DSNAME,TASKNAME" % (dsname,taskname) ]
600  else:
601  q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%s' order by DSNAME,TASKNAME" % (dsname) ]
602  return self.execute(q).fetchall()
603 
604 
606  """TaskAnalyzer is a class for analyzing the files of a given task."""
607 
608  def __init__(self,jobDir,dsName,taskName):
609  self.jobDir = jobDir
610  self.dsName = dsName
611  self.taskName = taskName
612  self.path = jobDir+'/'+dsName+'/'+taskName
613 
614  self.status = TaskManager.StatusCodes['UNKNOWN']
615  self.onDisk = TaskManager.OnDiskCodes['UNKNOWN']
616  self.nResultFiles = 0
617  self.nJobs = 0
618  self.nJobsSubmitted = 0
619  self.nJobsRunning = 0
620  self.nJobsPostProc = 0
621  self.nJobsFailed = 0
622  self.nJobsCompleted = 0
623 
624 
625  def isOnDisk(self):
626  """Check if task directories are still on disk."""
627  onDisk = os.path.exists(self.path)
628  if not onDisk:
629  self.onDisk = TaskManager.OnDiskCodes['DELETED']
630  return onDisk
631 
632 
633  def analyzeFiles(self):
634  onDisk = self.isOnDisk()
635  if onDisk:
636  self.nJobs = len(glob.glob(self.path+'/*/*.config.py'))
637  self.nResultFiles = len(glob.glob(self.path+'/*')) - self.nJobs
638  self.nJobsSubmitted = len(glob.glob(self.path+'/*/*.SUBMITTED'))
639  self.nJobsRunning = len(glob.glob(self.path+'/*/*.RUNNING'))
640  self.nJobsPostProc = len(glob.glob(self.path+'/*/*.POSTPROCESSING'))
641  self.nJobsCompleted = len(glob.glob(self.path+'/*/*.COMPLETED'))
642  self.nJobsFailed = self.nJobsCompleted - len(glob.glob(self.path+'/*/*.exit.0'))
643 
644  self.status = TaskManager.StatusCodes['UNKNOWN']
645  if self.nJobs>0:
646  self.status = TaskManager.StatusCodes['CONFIGURED']
647  if self.nJobs==self.nJobsCompleted:
648  self.status = TaskManager.StatusCodes['COMPLETED']
649 
650  if self.nJobsSubmitted>0:
651  self.status = TaskManager.StatusCodes['SUBMITTED']
652  if self.nJobsRunning>0:
653  self.status = TaskManager.StatusCodes['RUNNING']
654  if self.nJobsPostProc>0:
655  self.status = TaskManager.StatusCodes['POSTPROCESSING']
656  if self.nJobsFailed>0:
657  if self.nJobsSubmitted>0 or self.nJobsRunning>0 or self.nJobsPostProc>0:
658  self.status = TaskManager.StatusCodes['PARTIALFAILED']
659  else:
660  self.status = TaskManager.StatusCodes['FAILED']
661 
662  return onDisk
663 
664 
665  def updateStatus(self,taskman):
666  taskman.updateStatus(self.dsName,self.taskName,
667  self.status,self.nResultFiles,
669 
670 
671 
673  """JobAnalyzer is a class for analyzing the jobs of a given task."""
674 
675  def __init__(self,jobDir,dsName,taskName):
676  self.jobDir = jobDir
677  self.dsName = dsName
678  self.taskName = taskName
679  self.path = jobDir+'/'+dsName+'/'+taskName
680  #self.maxWallTime = 5*86400 # max time job should be running in s
681 
682 
683  def jobList(self):
684  try:
685  l = os.listdir(self.path)
686  except Exception:
687  l = []
688  return l
689 
690 
691  def jobStatus(self,jobName):
692  status = TaskManager.StatusCodes['UNKNOWN']
693  exitcode = ''
694  p = self.path+'/'+jobName
695  if os.path.exists(p):
696  if glob.glob(p+'/*.config.py'):
697  status = TaskManager.StatusCodes['CONFIGURED']
698  if glob.glob(p+'/*.SUBMITTED'):
699  status = TaskManager.StatusCodes['SUBMITTED']
700  # Protect against jobs dying w/o updating time stamp files - must be
701  # accompanied by similar protection in TaskAnalyzer
702  #r = glob.glob(p+'/*.RUNNING'):
703  #if r:
704  # status = TaskManager.StatusCodes['RUNNING']
705  # if len(r)==1 && (time.time()-os.path.getctime(r[0]))>self.maxWallTime:
706  # status = TaskManager.StatusCodes['FAILED']
707  if glob.glob(p+'/*.RUNNING'):
708  status = TaskManager.StatusCodes['RUNNING']
709  if glob.glob(p+'/*.POSTPROCESSING'):
710  status = TaskManager.StatusCodes['POSTPROCESSING']
711  if glob.glob(p+'/*.COMPLETED'):
712  status = TaskManager.StatusCodes['COMPLETED']
713  # Check if job completed w/o producing an exit status
714  if len(glob.glob(p+'/*.exit.0'))==0:
715  status = TaskManager.StatusCodes['FAILED']
716  try:
717  exitcode = open(glob.glob(p+'/*.exitstatus.dat')[0]).read()
718  except Exception:
719  pass
720  if len(glob.glob(p+'/*.exit.*'))>len(glob.glob(p+'/*.exit.0')):
721  status = TaskManager.StatusCodes['FAILED']
722  return (status,exitcode)
723 
724 
725 #
726 # Test code for modules
727 #
728 if __name__ == '__main__':
729  #t = TaskManager('sqlite_file:test.db', True)
730  #t = TaskManager('sqlite_file:test.db')
731  #t = TaskManager('oracle:USER/PASSWORD@HOST')
732  #t.addTask('foo','bar','VertexTemplate.py','15.4.0',55)
733  #t.addTask('foo','bbb','VertexTemplate.py','15.4.0',55)
734  pass
read
IovVectorMap_t read(const Folder &theFolder, const SelectionCriterion &choice, const unsigned int limit=10)
Definition: openCoraCool.cxx:569
python.TaskManager.TaskManager.execute
def execute(self, statementParts, doCommit=False, limit=None)
Definition: TaskManager.py:311
python.TaskManager.TaskManager.addTask
def addTask(self, dsName, taskName, template, release, njobs, taskpostprocsteps='', status=StatusCodes['SUBMITTED'], onDisk=OnDiskCodes['ALLONDISK'], createdTime=None, createdUser=None, createdHost=None, comment='')
Definition: TaskManager.py:395
createLinkingScheme.iter
iter
Definition: createLinkingScheme.py:62
python.TaskManager.TaskManager.__enter__
def __enter__(self)
Definition: TaskManager.py:204
python.TaskManager.JobAnalyzer.path
path
Definition: TaskManager.py:679
python.TaskManager.TaskManager.getCount
def getCount(self, what, qual=())
Definition: TaskManager.py:517
python.TaskManager.TaskAnalyzer.nJobs
nJobs
Definition: TaskManager.py:617
vtune_athena.format
format
Definition: vtune_athena.py:14
python.TaskManager.TaskManagerDatabaseError
Definition: TaskManager.py:29
python.TaskManager.JobAnalyzer.jobList
def jobList(self)
Definition: TaskManager.py:683
python.TaskManager.TaskAnalyzer.isOnDisk
def isOnDisk(self)
Definition: TaskManager.py:625
python.TaskManager.getJobConfig
def getJobConfig(jobDir, dsName, taskName, jobName=' *')
Definition: TaskManager.py:101
python.TaskManager.appendUnique
def appendUnique(s, v)
Definition: TaskManager.py:64
python.TaskManager.TaskManager.getStatus
def getStatus(self, dsName, taskName)
Definition: TaskManager.py:458
python.TaskManager.TaskAnalyzer.nResultFiles
nResultFiles
Definition: TaskManager.py:616
python.TaskManager.DbParam.__repr__
def __repr__(self)
Definition: TaskManager.py:116
python.TaskManager.TaskAnalyzer.dsName
dsName
Definition: TaskManager.py:610
python.TaskManager.TaskAnalyzer.taskName
taskName
Definition: TaskManager.py:611
python.Utils.getRunFromName
def getRunFromName(name, default='', asInt=False)
Definition: InnerDetector/InDetExample/InDetBeamSpotExample/python/Utils.py:13
python.TaskManager.TaskManager.getDSNames
def getDSNames(self, dsname)
Definition: TaskManager.py:582
python.TaskManager.TaskAnalyzer.nJobsCompleted
nJobsCompleted
Definition: TaskManager.py:622
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
python.TaskManager.DbParam
Definition: TaskManager.py:112
python.TaskManager.TaskManagerCheckError
Definition: TaskManager.py:26
python.TaskManager.TaskManager.dbtype
dbtype
Definition: TaskManager.py:179
python.TaskManager.TaskAnalyzer.status
status
Definition: TaskManager.py:614
python.TaskManager.TaskManager.paramstyle
paramstyle
Definition: TaskManager.py:174
checkCorrelInHIST.cursor
cursor
Definition: checkCorrelInHIST.py:26
python.TaskManager.JobAnalyzer.taskName
taskName
Definition: TaskManager.py:678
python.TaskManager.TaskManager.getTaskValue
def getTaskValue(self, dsName, taskName, what)
Definition: TaskManager.py:530
python.TaskManager.TaskManager.__exit__
def __exit__(self, exc_type, exc_value, traceback)
Definition: TaskManager.py:209
python.TaskManager.TaskManager.__init__
def __init__(self, connstring='', createDatabase=False)
Definition: TaskManager.py:166
python.TaskManager.TaskManager.getValue
def getValue(self, what, qual=())
Definition: TaskManager.py:509
python.TaskManager.DbParam.value
value
Definition: TaskManager.py:115
python.TaskManager.TaskManager
Definition: TaskManager.py:120
python.TaskManager.TaskManager.parseConnectionInfo
def parseConnectionInfo(self, connstring='')
Definition: TaskManager.py:144
python.TaskManager.TaskManager.deleteTask
def deleteTask(self, dsName, taskName)
Definition: TaskManager.py:388
python.TaskManager.DbParam.__init__
def __init__(self, value)
Definition: TaskManager.py:114
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
python.TaskManager.TaskManager.updateStatus
def updateStatus(self, dsName, taskName, status, nResultFiles, nJobs, nJobsSubmitted, nJobsRunning, nJobsPostProc, nJobsFailed, nJobsCompleted)
Definition: TaskManager.py:484
python.TaskManager.JobAnalyzer.jobStatus
def jobStatus(self, jobName)
Definition: TaskManager.py:691
python.TaskManager.JobAnalyzer.__init__
def __init__(self, jobDir, dsName, taskName)
Definition: TaskManager.py:675
python.TaskManager.getFullTaskNames
def getFullTaskNames(taskman, dsname, taskname, requireSingleTask=False, confirmWithUser=False, addWildCards=True)
Definition: TaskManager.py:72
python.TaskManager.JobAnalyzer.dsName
dsName
Definition: TaskManager.py:677
python.TaskManager.TaskAnalyzer.updateStatus
def updateStatus(self, taskman)
Definition: TaskManager.py:665
python.TaskManager.TaskAnalyzer.path
path
Definition: TaskManager.py:612
python.TaskManager.JobAnalyzer.jobDir
jobDir
Definition: TaskManager.py:676
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:25
python.TaskManager.TaskManager.getTaskDict
def getTaskDict(self, dsname, taskname, what=' *', qual=())
Definition: TaskManager.py:561
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.TaskManager.TaskManager.debug
debug
Definition: TaskManager.py:173
python.TaskManager.getKey
def getKey(d, v)
Definition: TaskManager.py:41
Trk::open
@ open
Definition: BinningType.h:40
python.TaskManager.TaskAnalyzer.__init__
def __init__(self, jobDir, dsName, taskName)
Definition: TaskManager.py:608
python.TaskManager.TaskAnalyzer.nJobsFailed
nJobsFailed
Definition: TaskManager.py:621
python.TaskManager.TaskManager.taskIter
def taskIter(self, what=' *', qual=('order by UPDATED',))
Definition: TaskManager.py:536
python.TaskManager.TaskManager.setDiskStatus
def setDiskStatus(self, dsName, taskName, onDisk)
Definition: TaskManager.py:476
python.TaskManager.TaskManager.is_managing_context
is_managing_context
Definition: TaskManager.py:175
python.TaskManager.TaskManager.dbname
dbname
Definition: TaskManager.py:177
python.TaskManager.TaskManager.getTaskNames
def getTaskNames(self, dsname, taskname=None, addWildCards=True)
Definition: TaskManager.py:589
python.TaskManager.TaskAnalyzer
Definition: TaskManager.py:605
python.TaskManager.TaskAnalyzer.nJobsRunning
nJobsRunning
Definition: TaskManager.py:619
python.TaskManager.TaskManager.dbcon
dbcon
Definition: TaskManager.py:187
python.TaskManager.getStatusClass
def getStatusClass(status)
Definition: TaskManager.py:51
python.TaskManager.TaskManager._createOracleSchema
def _createOracleSchema(self)
Definition: TaskManager.py:253
python.TaskManager.TaskAnalyzer.onDisk
onDisk
Definition: TaskManager.py:615
pickleTool.object
object
Definition: pickleTool.py:29
str
Definition: BTagTrackIpAccessor.cxx:11
python.TaskManager.TaskAnalyzer.nJobsSubmitted
nJobsSubmitted
Definition: TaskManager.py:618
python.TaskManager.TaskManager.setStatus
def setStatus(self, dsName, taskName, status, oldStatus=None)
Definition: TaskManager.py:464
calibdata.commit
bool commit
Definition: calibdata.py:831
python.TaskManager.dictFactory
def dictFactory(cursor, row)
Definition: TaskManager.py:33
python.TaskManager.TaskAnalyzer.nJobsPostProc
nJobsPostProc
Definition: TaskManager.py:620
python.TaskManager.TaskManager._createSQLiteSchema
def _createSQLiteSchema(self)
Definition: TaskManager.py:216
python.TaskManager.TaskManager.getNTasks
def getNTasks(self, qual=())
Definition: TaskManager.py:525
python.TaskManager.TaskAnalyzer.jobDir
jobDir
Definition: TaskManager.py:609
python.Utils.getUserName
def getUserName(default='UNKNOWN')
Definition: InnerDetector/InDetExample/InDetBeamSpotExample/python/Utils.py:48
python.TaskManager.TaskAnalyzer.analyzeFiles
def analyzeFiles(self)
Definition: TaskManager.py:633
python.TaskManager.JobAnalyzer
Definition: TaskManager.py:672
python.TaskManager.TaskManager.setValue
def setValue(self, dsName, taskName, fieldName, value)
Definition: TaskManager.py:501
python.TaskManager.TaskManager.taskIterDict
def taskIterDict(self, what=' *', qual=('order by UPDATED',), limit=999999999)
Definition: TaskManager.py:543