ATLAS Offline Software
Public Member Functions | Public Attributes | Static Public Attributes | Private Member Functions | List of all members
python.TaskManager.TaskManager Class Reference
Collaboration diagram for python.TaskManager.TaskManager:

Public Member Functions

def parseConnectionInfo (self, connstring='')
 
def __init__ (self, connstring='', createDatabase=False)
 
def __enter__ (self)
 
def __exit__ (self, exc_type, exc_value, traceback)
 
def execute (self, statementParts, doCommit=False, limit=None)
 
def deleteTask (self, dsName, taskName)
 
def addTask (self, dsName, taskName, template, release, njobs, taskpostprocsteps='', status=StatusCodes['SUBMITTED'], onDisk=OnDiskCodes['ALLONDISK'], createdTime=None, createdUser=None, createdHost=None, comment='')
 
def getStatus (self, dsName, taskName)
 
def setStatus (self, dsName, taskName, status, oldStatus=None)
 
def setDiskStatus (self, dsName, taskName, onDisk)
 
def updateStatus (self, dsName, taskName, status, nResultFiles, nJobs, nJobsSubmitted, nJobsRunning, nJobsPostProc, nJobsFailed, nJobsCompleted)
 
def setValue (self, dsName, taskName, fieldName, value)
 
def getValue (self, what, qual=())
 
def getCount (self, what, qual=())
 
def getNTasks (self, qual=())
 
def getTaskValue (self, dsName, taskName, what)
 
def taskIter (self, what=' *', qual=('order by UPDATED',))
 
def taskIterDict (self, what=' *', qual=('order by UPDATED',), limit=999999999)
 
def getTaskDict (self, dsname, taskname, what=' *', qual=())
 
def getDSNames (self, dsname)
 
def getTaskNames (self, dsname, taskname=None, addWildCards=True)
 

Public Attributes

 debug
 
 paramstyle
 
 is_managing_context
 
 dbname
 
 dbtype
 
 dbcon
 

Static Public Attributes

dictionary StatusCodes
 
dictionary OnDiskCodes
 

Private Member Functions

def _createSQLiteSchema (self)
 
def _createOracleSchema (self)
 

Detailed Description

TaskManager is tool for keeping track of JobRunner jobs.

Definition at line 120 of file TaskManager.py.

Constructor & Destructor Documentation

◆ __init__()

def python.TaskManager.TaskManager.__init__ (   self,
  connstring = '',
  createDatabase = False 
)
Constructor. connstring is a connection string specifying either a SQLite file
   ("sqlite_file:..."), an Oracle database ("oracle://..."), or an authorization file
   ("auth_file:...") with connection information. If connstring is empty, the connection
   information will be taken from TASKDB (if set), or otherwise defaults to
   'sqlite_file:taskdata.db'.

Definition at line 166 of file TaskManager.py.

166  def __init__(self, connstring='', createDatabase=False):
167  """Constructor. connstring is a connection string specifying either a SQLite file
168  ("sqlite_file:..."), an Oracle database ("oracle://..."), or an authorization file
169  ("auth_file:...") with connection information. If connstring is empty, the connection
170  information will be taken from TASKDB (if set), or otherwise defaults to
171  'sqlite_file:taskdata.db'."""
172 
173  self.debug = False
174  self.paramstyle = None
175  self.is_managing_context = False
176 
177  self.dbtype, self.dbname = self.__class__.parseConnectionInfo(connstring)
178 
179  if self.dbtype == 'sqlite_file':
180  import sqlite3
181  self.paramstyle = 'qmark'
182  dbexists = os.access(self.dbname, os.F_OK)
183  if dbexists and createDatabase:
184  raise ValueError ('SQLite file {} exists already - remove before recreating'.format(self.dbname))
185  if not (dbexists or createDatabase):
186  raise ValueError ('TaskManager database not found (SQLite file {})'.format(self.dbname))
187  self.dbcon = sqlite3.connect(self.dbname)
188  if createDatabase:
189  self._createSQLiteSchema()
190  elif self.dbtype == 'oracle':
191  import cx_Oracle
192  self.paramstyle = 'named'
193  try:
194  self.dbcon = cx_Oracle.connect(self.dbname)
195  except Exception:
196  print ('ERROR: First connection attempt to Beam Spot Oracle database failed; will retry in 10s ...')
197  time.sleep(10)
198  self.dbcon = cx_Oracle.connect(self.dbname)
199  if createDatabase:
200  self._createOracleSchema()
201  else:
202  raise ValueError ('Unknown database type: {}'.format(self.dbtype))
203 

Member Function Documentation

◆ __enter__()

def python.TaskManager.TaskManager.__enter__ (   self)
Remember that we're inside a 'with' statement so we can warn otherwise: 

Definition at line 204 of file TaskManager.py.

204  def __enter__(self):
205  ''' Remember that we're inside a 'with' statement so we can warn otherwise: '''
206  self.is_managing_context = True
207  return self
208 

◆ __exit__()

def python.TaskManager.TaskManager.__exit__ (   self,
  exc_type,
  exc_value,
  traceback 
)
Close the database connection at the end of the 'with' statement. 

Definition at line 209 of file TaskManager.py.

209  def __exit__(self, exc_type, exc_value, traceback):
210  ''' Close the database connection at the end of the 'with' statement. '''
211  try:
212  self.dbcon.close()
213  except Exception:
214  print ('ERROR: Unable to close database connection')
215 

◆ _createOracleSchema()

def python.TaskManager.TaskManager._createOracleSchema (   self)
private
Create the database schema for an Oracle database.

Definition at line 253 of file TaskManager.py.

253  def _createOracleSchema(self):
254  """Create the database schema for an Oracle database."""
255  try:
256  self.dbcon.cursor().execute('drop sequence TASKS_SEQ')
257  self.dbcon.cursor().execute('drop trigger TASKS_ID_TRIGGER')
258  self.dbcon.cursor().execute('drop table TASKS')
259  except Exception as e:
260  print (e)
261  self.dbcon.cursor().execute("""
262  create table tasks (
263  TASKID number(10,0),
264  DSNAME varchar2(256),
265  RUNNR number(10,0),
266  TASKNAME varchar2(256),
267  TEMPLATE varchar2(256),
268  TASKPOSTPROCSTEPS varchar2(256),
269  ATLREL varchar2(256),
270  CREATED number,
271  CREATED_USER varchar2(256),
272  CREATED_HOST varchar2(256),
273  UPDATED number,
274  STATUS number(5,0),
275  ONDISK number(5,0),
276  LOGFILES varchar2(256),
277  COOLTAGS varchar2(256),
278  NRESULTFILES number(10,0) default 0,
279  NJOBS number(10,0) default 0,
280  NJOBS_SUBMITTED number(10,0) default -1,
281  NJOBS_RUNNING number(10,0) default -1,
282  NJOBS_POSTPROC number(10,0) default -1,
283  NJOBS_FAILED number(10,0) default -1,
284  NJOBS_COMPLETED number(10,0) default -1,
285  RESULTFILES varchar2(4000),
286  RESULTLINKS varchar2(4000),
287  TASKCOMMENT varchar2(4000),
288  STATUS_POSTPROC number(5,0) default -1,
289  BASEPATH varchar2(256),
290  AUXDATA varchar2(4000),
291  constraint BEAMSPOT_TASKS_PK primary key (TASKID)
292  )
293  """)
294  self.dbcon.cursor().execute('create index TASKS_DSNAME_INDX on TASKS(DSNAME)')
295  self.dbcon.cursor().execute('create index TASKS_RUNNR_INDX on TASKS(RUNNR)')
296  self.dbcon.cursor().execute('alter table TASKS add constraint TASKS_TNAME_DSNAME_UQ unique (TASKNAME, DSNAME)')
297  self.dbcon.cursor().execute('create sequence TASKS_SEQ start with 1 increment by 1 nomaxvalue')
298  # The following statement must not have whitespace at the beginning of the line, otherwise
299  # the trigger function won't be parsed correctly:
300  self.dbcon.cursor().execute("""
301 create or replace trigger TASKS_ID_TRIGGER
302 before insert on TASKS
303 for each row
304 begin
305  select TASKS_SEQ.nextval into :new.TASKID from dual;
306 end;
307 """)
308  self.dbcon.commit()
309 
310 

◆ _createSQLiteSchema()

def python.TaskManager.TaskManager._createSQLiteSchema (   self)
private
Create the database schema for a SQLite3 database.

Definition at line 216 of file TaskManager.py.

216  def _createSQLiteSchema(self):
217  """Create the database schema for a SQLite3 database."""
218  self.dbcon.cursor().executescript("""
219  create table tasks (
220  TASKID integer primary key autoincrement,
221  DSNAME text,
222  RUNNR integer,
223  TASKNAME text,
224  TEMPLATE text,
225  TASKPOSTPROCSTEPS text,
226  ATLREL text,
227  CREATED real,
228  CREATED_USER text,
229  CREATED_HOST text,
230  UPDATED real,
231  STATUS integer,
232  ONDISK integer,
233  LOGFILES text,
234  COOLTAGS text,
235  NRESULTFILES integer default 0,
236  NJOBS integer default 0,
237  NJOBS_SUBMITTED integer default -1,
238  NJOBS_RUNNING integer default -1,
239  NJOBS_POSTPROC integer default -1,
240  NJOBS_FAILED integer default -1,
241  NJOBS_COMPLETED integer default -1,
242  RESULTFILES text,
243  RESULTLINKS text,
244  TASKCOMMENT text,
245  STATUS_POSTPROC integer,
246  BASEPATH text,
247  AUXDATA text
248  );
249  """)
250  self.dbcon.commit()
251 
252 

◆ addTask()

def python.TaskManager.TaskManager.addTask (   self,
  dsName,
  taskName,
  template,
  release,
  njobs,
  taskpostprocsteps = '',
  status = StatusCodes['SUBMITTED'],
  onDisk = OnDiskCodes['ALLONDISK'],
  createdTime = None,
  createdUser = None,
  createdHost = None,
  comment = '' 
)
Add an entry for a new task if the task doesn't exist already. If the task exists,
   its UPDATED, NJOBS, STATUS and ONDISK fields will be updated.

Definition at line 395 of file TaskManager.py.

395  def addTask(self,dsName,taskName,template,release,njobs,
396  taskpostprocsteps='',
397  status=StatusCodes['SUBMITTED'],
398  onDisk=OnDiskCodes['ALLONDISK'],
399  createdTime=None,
400  createdUser=None,
401  createdHost=None,
402  comment=''):
403  """Add an entry for a new task if the task doesn't exist already. If the task exists,
404  its UPDATED, NJOBS, STATUS and ONDISK fields will be updated."""
405  tstamp = time.time()
406  nDefined = self.getNTasks(['where DSNAME =',DbParam(dsName),'and TASKNAME=',DbParam(taskName)])
407  updateStatus = self.getNTasks(['where DSNAME = ',DbParam(dsName),'and TASKNAME=',DbParam(taskName),'and STATUS < %i' % TaskManager.StatusCodes['POSTPROCRUNNING']])
408 
409  if nDefined:
410  # Task entry exists already; only update UPDATED, NJOBS, STATUS, ONDISK and possibly ATLREL
411  task = self.getTaskDict(dsName,taskName)
412  if task['TEMPLATE']!=template:
413  print ('ERROR: Must not update task with different template: DSNAME = %s, TASKNAME = %s, templates = %s vs %s' % (dsName,taskName,task['TEMPLATE'],template))
414  else:
415  updateStr = ['update TASKS set UPDATED =',DbParam(tstamp),
416  ', NJOBS = ',DbParam(task['NJOBS']+njobs),
417  ', ONDISK = ',DbParam(onDisk)]
418 
419  if release not in task['ATLREL']:
420  print ('WARNING: Updating task using different release: DSNAME = %s, TASKNAME = %s, release = = %s vs %s' % (dsName,taskName,task['ATLREL'],release))
421  release = '; '.join([task['ATLREL'],release])
422  updateStr += [', ATLREL = ',DbParam(release)]
423 
424  if updateStatus:
425  updateStr += [', STATUS = ',DbParam(status)]
426 
427  updateStr += ['where DSNAME = ',DbParam(dsName),'and TASKNAME = ',DbParam(taskName)]
428 
429  self.execute( updateStr)
430 
431  else:
432  # New task entry
433  if not createdTime:
434  createdTime = tstamp
435  if not createdUser:
436  createdUser = getUserName()
437  if not createdHost:
438  createdHost = os.uname()[1]
439  runnr = getRunFromName(dsName,None,True)
440  self.execute( ['insert into TASKS (DSNAME,RUNNR,TASKNAME,TEMPLATE,TASKPOSTPROCSTEPS,ATLREL,CREATED,CREATED_USER,CREATED_HOST,UPDATED,STATUS,ONDISK,NJOBS,TASKCOMMENT) values (',
441  DbParam(dsName),',',
442  DbParam(runnr),',',
443  DbParam(taskName),',',
444  DbParam(template),',',
445  DbParam(taskpostprocsteps),',',
446  DbParam(release),',',
447  DbParam(createdTime),',',
448  DbParam(createdUser),',',
449  DbParam(createdHost),',',
450  DbParam(tstamp),',',
451  DbParam(status),',',
452  DbParam(onDisk),',',
453  DbParam(njobs),',',
454  DbParam(comment),')'],
455  True)
456 
457 

◆ deleteTask()

def python.TaskManager.TaskManager.deleteTask (   self,
  dsName,
  taskName 
)
Delete a task entry from the taskmanager database.

Definition at line 388 of file TaskManager.py.

388  def deleteTask(self,dsName,taskName):
389  """Delete a task entry from the taskmanager database."""
390  q = [ 'delete from TASKS where DSNAME =',DbParam(dsName),'and TASKNAME =',DbParam(taskName) ]
391  cursor = self.execute(q,True)
392  return cursor.rowcount
393 
394 

◆ execute()

def python.TaskManager.TaskManager.execute (   self,
  statementParts,
  doCommit = False,
  limit = None 
)
Execute a SQL statement passed as a list or tuple of statement parts, where each
   part is either a partial SQL string, or an object of type DbParam specifying a parameter
   for the SQL statement. The actual SQL statement is assembled (using the parameter style
   of the current database) and executed, and the resulting cursor is returned.
   Loosely follows the method discussed in the Python Cookbook.
   WARNING: At present, limit doesn't work when ordering rows for Oracle!

Definition at line 311 of file TaskManager.py.

311  def execute(self,statementParts, doCommit=False, limit=None):
312  """Execute a SQL statement passed as a list or tuple of statement parts, where each
313  part is either a partial SQL string, or an object of type DbParam specifying a parameter
314  for the SQL statement. The actual SQL statement is assembled (using the parameter style
315  of the current database) and executed, and the resulting cursor is returned.
316  Loosely follows the method discussed in the Python Cookbook.
317  WARNING: At present, limit doesn't work when ordering rows for Oracle!"""
318  if not self.is_managing_context:
319  print ('**WARN** TaskManager will keep the database connection open until it is deleted.')
320  print ('**INFO** TaskManager should generally only be used inside a with statement:')
321  print ('**INFO** with TaskManager(...) as taskman:')
322  print ('**INFO** # do something ...')
323 
324  if not statementParts:
325  return None
326  if isinstance(statementParts,str):
327  raise TypeError ('Must pass list or tuple to TaskManager.execute')
328  for p in statementParts:
329  if not (isinstance(p,DbParam) or isinstance(p,str)):
330  raise ValueError ('Can only pass SQL string fragments and DbParam objects in list'
331  'to TaskManager.execute, found %s with "%s"' % (type(p),str(p)))
332  sqlParts = None
333  params = None
334 
335  if self.paramstyle=='qmark':
336  sqlParts = []
337  params = []
338  for p in statementParts:
339  if isinstance(p,DbParam):
340  sqlParts.append('?')
341  params.append(p.value)
342  else:
343  sqlParts.append(p)
344 
345  if self.paramstyle=='named':
346  sqlParts = []
347  params = {}
348  for p in statementParts:
349  if isinstance(p,DbParam):
350  name = 'p%d' % len(params)
351  sqlParts.append(':%s' % name)
352  params[name] = p.value
353  else:
354  sqlParts.append(p)
355 
356  if sqlParts is None:
357  raise ValueError ('Unknown SQL parameter style %s' % self.paramstyle)
358  return None
359  sql = ' '.join(sqlParts)
360 
361  # Handle database-dependent limit clause (PRELIMINARY)
362  if limit:
363  if self.dbtype=='oracle':
364  # WARNING: the following doesn't work always when ordering rows!
365  if 'order by' in sql: # WARNING: works only if order by is in lowercase!
366  sqlParts2 = sql.split('order by')
367  sql = '%s and ROWNUM <= %i order by %s' % (sqlParts2[0],limit,sqlParts2[1])
368  else:
369  sql += ' and ROWNUM <= %i' % limit
370  else:
371  sql += ' limit %i' % limit
372 
373  if self.debug:
374  print ('\nExecuting SQL statement: ',sql)
375  print (' with parameters: ',params)
376 
377  cursor = self.dbcon.cursor()
378  try:
379  cursor.execute(sql,params)
380  if doCommit:
381  self.dbcon.commit()
382  except Exception as e:
383  msg = '\nDatabase error executing SQL statement\n %s\nusing parameters\n %s\n%s' % (sql,params,e)
384  raise TaskManagerDatabaseError (msg)
385  return cursor
386 
387 

◆ getCount()

def python.TaskManager.TaskManager.getCount (   self,
  what,
  qual = () 
)

Definition at line 517 of file TaskManager.py.

517  def getCount(self, what, qual=()):
518  # Except when using count(distinct(...)) which is not supported by earlier sqlite versions,
519  # better use getNTasks
520  q = [ 'select %s from TASKS' % what ]
521  q.extend(qual)
522  return len(self.execute(q).fetchall())
523 
524 

◆ getDSNames()

def python.TaskManager.TaskManager.getDSNames (   self,
  dsname 
)
Get all DSNAMEs for a given (partial) data set name dsname.

Definition at line 582 of file TaskManager.py.

582  def getDSNames(self,dsname):
583  """Get all DSNAMEs for a given (partial) data set name dsname."""
584  #q = [ "select distinct(DSNAME) from TASKS where DSNAME like '%%%s%%' order by UPDATED" % dsname ]
585  q = [ "select distinct(DSNAME) from TASKS where DSNAME like '%%%s%%' order by DSNAME" % dsname ]
586  return self.execute(q).fetchall()
587 
588 

◆ getNTasks()

def python.TaskManager.TaskManager.getNTasks (   self,
  qual = () 
)

Definition at line 525 of file TaskManager.py.

525  def getNTasks(self, qual=()):
526  q = [ 'select count(*) from TASKS' ]
527  q.extend(qual)
528  return self.execute(q).fetchone()[0]
529 

◆ getStatus()

def python.TaskManager.TaskManager.getStatus (   self,
  dsName,
  taskName 
)
Get status of a task.

Definition at line 458 of file TaskManager.py.

458  def getStatus(self,dsName,taskName):
459  """Get status of a task."""
460  cursor = self.execute( ['select STATUS from TASKS where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName) ] )
461  return cursor.fetchone()[0]
462 
463 

◆ getTaskDict()

def python.TaskManager.TaskManager.getTaskDict (   self,
  dsname,
  taskname,
  what = '*',
  qual = () 
)

Definition at line 561 of file TaskManager.py.

561  def getTaskDict(self, dsname, taskname, what='*', qual=()):
562  # NOTE: We could implement this more elegantly using row_factory (sqlite3)
563  # or rowfactory (Oracle), but this would come at the expense of a
564  # direct database dependence.
565  if qual:
566  q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME =', DbParam(taskname) ]
567  q.extend(qual)
568  else:
569  if self.dbtype=='oracle':
570  # WARNING: DOES THIS ALWAYS WORK? There anyway should only be a single task here?
571  q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME = ', DbParam(taskname), 'and ROWNUM <= 1 order by UPDATED' ]
572  else:
573  q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME = ', DbParam(taskname), 'order by UPDATED limit 1' ]
574  cursor = self.execute(q)
575  row = cursor.fetchone()
576  if row is None:
577  return {}
578  else:
579  return dictFactory(cursor,row)
580 
581 

◆ getTaskNames()

def python.TaskManager.TaskManager.getTaskNames (   self,
  dsname,
  taskname = None,
  addWildCards = True 
)
Find all matching tasks and return list of full (DSNAME,TASKNAME) pairs. dsname and taskname
   may be any partial dataset and task name, respectively.

Definition at line 589 of file TaskManager.py.

589  def getTaskNames(self,dsname,taskname=None,addWildCards=True):
590  """Find all matching tasks and return list of full (DSNAME,TASKNAME) pairs. dsname and taskname
591  may be any partial dataset and task name, respectively."""
592  if addWildCards:
593  if taskname:
594  q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' and TASKNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname,taskname) ]
595  else:
596  q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname) ]
597  else:
598  if taskname:
599  q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%s' and TASKNAME like '%s' order by DSNAME,TASKNAME" % (dsname,taskname) ]
600  else:
601  q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%s' order by DSNAME,TASKNAME" % (dsname) ]
602  return self.execute(q).fetchall()
603 
604 

◆ getTaskValue()

def python.TaskManager.TaskManager.getTaskValue (   self,
  dsName,
  taskName,
  what 
)
Get a single value from the task database. If the query results in more than one value, only
   the first value is returned.

Definition at line 530 of file TaskManager.py.

530  def getTaskValue(self, dsName,taskName, what):
531  """Get a single value from the task database. If the query results in more than one value, only
532  the first value is returned."""
533  q = [ 'select %s from TASKS where DSNAME =' % what , DbParam(dsName), 'and TASKNAME =', DbParam(taskName)]
534  return self.execute(q).fetchone()[0]
535 

◆ getValue()

def python.TaskManager.TaskManager.getValue (   self,
  what,
  qual = () 
)
Get a single value from the task database. If the query results in more than one value, only
   the first value is returned.

Definition at line 509 of file TaskManager.py.

509  def getValue(self, what, qual=()):
510  """Get a single value from the task database. If the query results in more than one value, only
511  the first value is returned."""
512  q = [ 'select %s from TASKS' % what ]
513  q.extend(qual)
514  return self.execute(q).fetchone()[0]
515 
516 

◆ parseConnectionInfo()

def python.TaskManager.TaskManager.parseConnectionInfo (   self,
  connstring = '' 
)

Definition at line 144 of file TaskManager.py.

144  def parseConnectionInfo(self, connstring=''):
145  if not connstring:
146  connstring = os.environ.get('TASKDB', 'sqlite_file:taskdata.db')
147 
148  try:
149  dbtype, dbname = connstring.split(':', 1)
150  except Exception:
151  raise ValueError ('Illegal database connection string {}'.format(connstring))
152 
153  if dbtype == 'auth_file':
154  # dbname is a file with the actual connection information
155  authfile = dbname
156  try:
157  with open(authfile) as af:
158  connstring = af.read().strip()
159  dbtype, dbname = connstring.split(':', 1)
160  except Exception:
161  raise ValueError ('Invalid authorization file {} (not readable or invalid format)'.format(authfile))
162 
163  return dbtype, dbname
164 
165 

◆ setDiskStatus()

def python.TaskManager.TaskManager.setDiskStatus (   self,
  dsName,
  taskName,
  onDisk 
)
Update the onDisk-status of a task.

Definition at line 476 of file TaskManager.py.

476  def setDiskStatus(self,dsName,taskName,onDisk):
477  """Update the onDisk-status of a task."""
478  tstamp = time.time()
479  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', ONDISK =',DbParam(onDisk),
480  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
481  return cursor.rowcount
482 
483 

◆ setStatus()

def python.TaskManager.TaskManager.setStatus (   self,
  dsName,
  taskName,
  status,
  oldStatus = None 
)
Set the status of a task. If oldStatus is set, the task must be in the designated status.

Definition at line 464 of file TaskManager.py.

464  def setStatus(self,dsName,taskName,status,oldStatus=None):
465  """Set the status of a task. If oldStatus is set, the task must be in the designated status."""
466  tstamp = time.time()
467  if oldStatus:
468  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', STATUS =',DbParam(status),
469  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName),'and STATUS =',DbParam(oldStatus)], True)
470  else:
471  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', STATUS =',DbParam(status),
472  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
473  return cursor.rowcount
474 
475 

◆ setValue()

def python.TaskManager.TaskManager.setValue (   self,
  dsName,
  taskName,
  fieldName,
  value 
)
Update a field of a given task.

Definition at line 501 of file TaskManager.py.

501  def setValue(self,dsName,taskName,fieldName,value):
502  """Update a field of a given task."""
503  tstamp = time.time()
504  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', %s =' % fieldName,DbParam(value),
505  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
506  return cursor.rowcount
507 
508 

◆ taskIter()

def python.TaskManager.TaskManager.taskIter (   self,
  what = '*',
  qual = ('order by UPDATED',) 
)

Definition at line 536 of file TaskManager.py.

536  def taskIter(self, what='*', qual=('order by UPDATED',)):
537  q = [ 'select %s from TASKS' % what ]
538  q.extend(qual)
539  cursor = self.execute(q)
540  return iter(cursor.fetchone,None)
541 
542 

◆ taskIterDict()

def python.TaskManager.TaskManager.taskIterDict (   self,
  what = '*',
  qual = ('order by UPDATED',),
  limit = 999999999 
)

Definition at line 543 of file TaskManager.py.

543  def taskIterDict(self, what='*', qual=('order by UPDATED',), limit=999999999):
544  # NOTE: We could implement this more elegantly using row_factory (sqlite3)
545  # or rowfactory (Oracle), but this would come at the expense of a
546  # direct database dependence. Also, the primitive limit on the
547  # number of items returned by the generator avoids database-
548  # dependent SQL limit statements (until limit in TaskManager.execute
549  # is fully implemented also for Oracle.
550  q = [ 'select %s from TASKS' % what ]
551  q.extend(qual)
552  cursor = self.execute(q)
553  n = 0
554  while n<limit:
555  row = cursor.fetchone()
556  if row is None: break
557  n += 1
558  yield dictFactory(cursor,row)
559 
560 

◆ updateStatus()

def python.TaskManager.TaskManager.updateStatus (   self,
  dsName,
  taskName,
  status,
  nResultFiles,
  nJobs,
  nJobsSubmitted,
  nJobsRunning,
  nJobsPostProc,
  nJobsFailed,
  nJobsCompleted 
)
Update task status information including number of jobs in different states.

Definition at line 484 of file TaskManager.py.

484  def updateStatus(self,dsName,taskName,
485  status,nResultFiles,nJobs,nJobsSubmitted,nJobsRunning,nJobsPostProc,nJobsFailed,nJobsCompleted):
486  """Update task status information including number of jobs in different states."""
487  tstamp = time.time()
488  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),
489  ', STATUS =',DbParam(status),
490  ', NRESULTFILES =',DbParam(nResultFiles),
491  ', NJOBS =',DbParam(nJobs),
492  ', NJOBS_SUBMITTED =',DbParam(nJobsSubmitted),
493  ', NJOBS_RUNNING =',DbParam(nJobsRunning),
494  ', NJOBS_POSTPROC =',DbParam(nJobsPostProc),
495  ', NJOBS_FAILED =',DbParam(nJobsFailed),
496  ', NJOBS_COMPLETED =',DbParam(nJobsCompleted),
497  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
498  return cursor.rowcount
499 
500 

Member Data Documentation

◆ dbcon

python.TaskManager.TaskManager.dbcon

Definition at line 187 of file TaskManager.py.

◆ dbname

python.TaskManager.TaskManager.dbname

Definition at line 177 of file TaskManager.py.

◆ dbtype

python.TaskManager.TaskManager.dbtype

Definition at line 179 of file TaskManager.py.

◆ debug

python.TaskManager.TaskManager.debug

Definition at line 173 of file TaskManager.py.

◆ is_managing_context

python.TaskManager.TaskManager.is_managing_context

Definition at line 175 of file TaskManager.py.

◆ OnDiskCodes

dictionary python.TaskManager.TaskManager.OnDiskCodes
static
Initial value:
= { 'UNKNOWN': -1,
'ALLONDISK': 0,
'RESULTSONDISK': 1,
# Codes above are for jobs whose task directory is on disk
'ARCHIVED': 10,
'DELETED': 11 }

Definition at line 136 of file TaskManager.py.

◆ paramstyle

python.TaskManager.TaskManager.paramstyle

Definition at line 174 of file TaskManager.py.

◆ StatusCodes

dictionary python.TaskManager.TaskManager.StatusCodes
static
Initial value:
= { 'UNKNOWN': -1,
'CONFIGURED': 0,
'SUBMITTED': 1,
'RUNNING': 2,
'PARTIALFAILED': 3,
# Codes above are considered tasks whose status needs to
# be monitored and updated
'POSTPROCESSING': 5,
'POSTPROCRUNNING': 6,
'POSTPROCFAILED': 7,
'COMPLETED': 10,
'FAILED': 11}

Definition at line 123 of file TaskManager.py.


The documentation for this class was generated from the following file:
createLinkingScheme.iter
iter
Definition: createLinkingScheme.py:62
vtune_athena.format
format
Definition: vtune_athena.py:14
python.Utils.getRunFromName
def getRunFromName(name, default='', asInt=False)
Definition: InnerDetector/InDetExample/InDetBeamSpotExample/python/Utils.py:13
python.CaloAddPedShiftConfig.type
type
Definition: CaloAddPedShiftConfig.py:42
LArG4FSStartPointFilterLegacy.execute
execute
Definition: LArG4FSStartPointFilterLegacy.py:20
checkCorrelInHIST.cursor
cursor
Definition: checkCorrelInHIST.py:26
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.processes.powheg.ZZ.ZZ.__init__
def __init__(self, base_directory, **kwargs)
Constructor: all process options are set here.
Definition: ZZ.py:18
Trk::open
@ open
Definition: BinningType.h:40
str
Definition: BTagTrackIpAccessor.cxx:11
calibdata.commit
bool commit
Definition: calibdata.py:831
python.TaskManager.dictFactory
def dictFactory(cursor, row)
Definition: TaskManager.py:33
python.Utils.getUserName
def getUserName(default='UNKNOWN')
Definition: InnerDetector/InDetExample/InDetBeamSpotExample/python/Utils.py:48