ATLAS Offline Software
Public Member Functions | Public Attributes | Static Public Attributes | Private Member Functions | List of all members
python.TaskManager.TaskManager Class Reference
Collaboration diagram for python.TaskManager.TaskManager:

Public Member Functions

def parseConnectionInfo (self, connstring='')
 
def __init__ (self, connstring='', createDatabase=False)
 
def __enter__ (self)
 
def __exit__ (self, exc_type, exc_value, traceback)
 
def execute (self, statementParts, doCommit=False, limit=None)
 
def deleteTask (self, dsName, taskName)
 
def addTask (self, dsName, taskName, template, release, njobs, taskpostprocsteps='', status=StatusCodes['SUBMITTED'], onDisk=OnDiskCodes['ALLONDISK'], createdTime=None, createdUser=None, createdHost=None, comment='')
 
def getStatus (self, dsName, taskName)
 
def setStatus (self, dsName, taskName, status, oldStatus=None)
 
def setDiskStatus (self, dsName, taskName, onDisk)
 
def updateStatus (self, dsName, taskName, status, nResultFiles, nJobs, nJobsSubmitted, nJobsRunning, nJobsPostProc, nJobsFailed, nJobsCompleted)
 
def setValue (self, dsName, taskName, fieldName, value)
 
def getValue (self, what, qual=())
 
def getCount (self, what, qual=())
 
def getNTasks (self, qual=())
 
def getTaskValue (self, dsName, taskName, what)
 
def taskIter (self, what=' *', qual=('order by UPDATED',))
 
def taskIterDict (self, what=' *', qual=('order by UPDATED',), limit=999999999)
 
def getTaskDict (self, dsname, taskname, what=' *', qual=())
 
def getDSNames (self, dsname)
 
def getTaskNames (self, dsname, taskname=None, addWildCards=True)
 

Public Attributes

 debug
 
 paramstyle
 
 is_managing_context
 
 dbname
 
 dbtype
 
 dbcon
 

Static Public Attributes

dictionary StatusCodes
 
dictionary OnDiskCodes
 

Private Member Functions

def _createSQLiteSchema (self)
 
def _createOracleSchema (self)
 

Detailed Description

TaskManager is tool for keeping track of JobRunner jobs.

Definition at line 122 of file TaskManager.py.

Constructor & Destructor Documentation

◆ __init__()

def python.TaskManager.TaskManager.__init__ (   self,
  connstring = '',
  createDatabase = False 
)
Constructor. connstring is a connection string specifying either a SQLite file
   ("sqlite_file:..."), an Oracle database ("oracle://..."), or an authorization file
   ("auth_file:...") with connection information. If connstring is empty, the connection
   information will be taken from TASKDB (if set), or otherwise defaults to
   'sqlite_file:taskdata.db'.

Definition at line 168 of file TaskManager.py.

168  def __init__(self, connstring='', createDatabase=False):
169  """Constructor. connstring is a connection string specifying either a SQLite file
170  ("sqlite_file:..."), an Oracle database ("oracle://..."), or an authorization file
171  ("auth_file:...") with connection information. If connstring is empty, the connection
172  information will be taken from TASKDB (if set), or otherwise defaults to
173  'sqlite_file:taskdata.db'."""
174 
175  self.debug = False
176  self.paramstyle = None
177  self.is_managing_context = False
178 
179  self.dbtype, self.dbname = self.__class__.parseConnectionInfo(connstring)
180 
181  if self.dbtype == 'sqlite_file':
182  import sqlite3
183  self.paramstyle = 'qmark'
184  dbexists = os.access(self.dbname, os.F_OK)
185  if dbexists and createDatabase:
186  raise ValueError ('SQLite file {} exists already - remove before recreating'.format(self.dbname))
187  if not (dbexists or createDatabase):
188  raise ValueError ('TaskManager database not found (SQLite file {})'.format(self.dbname))
189  self.dbcon = sqlite3.connect(self.dbname)
190  if createDatabase:
191  self._createSQLiteSchema()
192  elif self.dbtype == 'oracle':
193  import cx_Oracle
194  self.paramstyle = 'named'
195  try:
196  self.dbcon = cx_Oracle.connect(self.dbname)
197  except Exception:
198  print ('ERROR: First connection attempt to Beam Spot Oracle database failed; will retry in 10s ...')
199  time.sleep(10)
200  self.dbcon = cx_Oracle.connect(self.dbname)
201  if createDatabase:
202  self._createOracleSchema()
203  else:
204  raise ValueError ('Unknown database type: {}'.format(self.dbtype))
205 

Member Function Documentation

◆ __enter__()

def python.TaskManager.TaskManager.__enter__ (   self)
Remember that we're inside a 'with' statement so we can warn otherwise: 

Definition at line 206 of file TaskManager.py.

206  def __enter__(self):
207  ''' Remember that we're inside a 'with' statement so we can warn otherwise: '''
208  self.is_managing_context = True
209  return self
210 

◆ __exit__()

def python.TaskManager.TaskManager.__exit__ (   self,
  exc_type,
  exc_value,
  traceback 
)
Close the database connection at the end of the 'with' statement. 

Definition at line 211 of file TaskManager.py.

211  def __exit__(self, exc_type, exc_value, traceback):
212  ''' Close the database connection at the end of the 'with' statement. '''
213  try:
214  self.dbcon.close()
215  except Exception:
216  print ('ERROR: Unable to close database connection')
217 

◆ _createOracleSchema()

def python.TaskManager.TaskManager._createOracleSchema (   self)
private
Create the database schema for an Oracle database.

Definition at line 255 of file TaskManager.py.

255  def _createOracleSchema(self):
256  """Create the database schema for an Oracle database."""
257  try:
258  self.dbcon.cursor().execute('drop sequence TASKS_SEQ')
259  self.dbcon.cursor().execute('drop trigger TASKS_ID_TRIGGER')
260  self.dbcon.cursor().execute('drop table TASKS')
261  except Exception as e:
262  print (e)
263  self.dbcon.cursor().execute("""
264  create table tasks (
265  TASKID number(10,0),
266  DSNAME varchar2(256),
267  RUNNR number(10,0),
268  TASKNAME varchar2(256),
269  TEMPLATE varchar2(256),
270  TASKPOSTPROCSTEPS varchar2(256),
271  ATLREL varchar2(256),
272  CREATED number,
273  CREATED_USER varchar2(256),
274  CREATED_HOST varchar2(256),
275  UPDATED number,
276  STATUS number(5,0),
277  ONDISK number(5,0),
278  LOGFILES varchar2(256),
279  COOLTAGS varchar2(256),
280  NRESULTFILES number(10,0) default 0,
281  NJOBS number(10,0) default 0,
282  NJOBS_SUBMITTED number(10,0) default -1,
283  NJOBS_RUNNING number(10,0) default -1,
284  NJOBS_POSTPROC number(10,0) default -1,
285  NJOBS_FAILED number(10,0) default -1,
286  NJOBS_COMPLETED number(10,0) default -1,
287  RESULTFILES varchar2(4000),
288  RESULTLINKS varchar2(4000),
289  TASKCOMMENT varchar2(4000),
290  STATUS_POSTPROC number(5,0) default -1,
291  BASEPATH varchar2(256),
292  AUXDATA varchar2(4000),
293  constraint BEAMSPOT_TASKS_PK primary key (TASKID)
294  )
295  """)
296  self.dbcon.cursor().execute('create index TASKS_DSNAME_INDX on TASKS(DSNAME)')
297  self.dbcon.cursor().execute('create index TASKS_RUNNR_INDX on TASKS(RUNNR)')
298  self.dbcon.cursor().execute('alter table TASKS add constraint TASKS_TNAME_DSNAME_UQ unique (TASKNAME, DSNAME)')
299  self.dbcon.cursor().execute('create sequence TASKS_SEQ start with 1 increment by 1 nomaxvalue')
300  # The following statement must not have whitespace at the beginning of the line, otherwise
301  # the trigger function won't be parsed correctly:
302  self.dbcon.cursor().execute("""
303 create or replace trigger TASKS_ID_TRIGGER
304 before insert on TASKS
305 for each row
306 begin
307  select TASKS_SEQ.nextval into :new.TASKID from dual;
308 end;
309 """)
310  self.dbcon.commit()
311 
312 

◆ _createSQLiteSchema()

def python.TaskManager.TaskManager._createSQLiteSchema (   self)
private
Create the database schema for a SQLite3 database.

Definition at line 218 of file TaskManager.py.

218  def _createSQLiteSchema(self):
219  """Create the database schema for a SQLite3 database."""
220  self.dbcon.cursor().executescript("""
221  create table tasks (
222  TASKID integer primary key autoincrement,
223  DSNAME text,
224  RUNNR integer,
225  TASKNAME text,
226  TEMPLATE text,
227  TASKPOSTPROCSTEPS text,
228  ATLREL text,
229  CREATED real,
230  CREATED_USER text,
231  CREATED_HOST text,
232  UPDATED real,
233  STATUS integer,
234  ONDISK integer,
235  LOGFILES text,
236  COOLTAGS text,
237  NRESULTFILES integer default 0,
238  NJOBS integer default 0,
239  NJOBS_SUBMITTED integer default -1,
240  NJOBS_RUNNING integer default -1,
241  NJOBS_POSTPROC integer default -1,
242  NJOBS_FAILED integer default -1,
243  NJOBS_COMPLETED integer default -1,
244  RESULTFILES text,
245  RESULTLINKS text,
246  TASKCOMMENT text,
247  STATUS_POSTPROC integer,
248  BASEPATH text,
249  AUXDATA text
250  );
251  """)
252  self.dbcon.commit()
253 
254 

◆ addTask()

def python.TaskManager.TaskManager.addTask (   self,
  dsName,
  taskName,
  template,
  release,
  njobs,
  taskpostprocsteps = '',
  status = StatusCodes['SUBMITTED'],
  onDisk = OnDiskCodes['ALLONDISK'],
  createdTime = None,
  createdUser = None,
  createdHost = None,
  comment = '' 
)
Add an entry for a new task if the task doesn't exist already. If the task exists,
   its UPDATED, NJOBS, STATUS and ONDISK fields will be updated.

Definition at line 397 of file TaskManager.py.

397  def addTask(self,dsName,taskName,template,release,njobs,
398  taskpostprocsteps='',
399  status=StatusCodes['SUBMITTED'],
400  onDisk=OnDiskCodes['ALLONDISK'],
401  createdTime=None,
402  createdUser=None,
403  createdHost=None,
404  comment=''):
405  """Add an entry for a new task if the task doesn't exist already. If the task exists,
406  its UPDATED, NJOBS, STATUS and ONDISK fields will be updated."""
407  tstamp = time.time()
408  nDefined = self.getNTasks(['where DSNAME =',DbParam(dsName),'and TASKNAME=',DbParam(taskName)])
409  updateStatus = self.getNTasks(['where DSNAME = ',DbParam(dsName),'and TASKNAME=',DbParam(taskName),'and STATUS < %i' % TaskManager.StatusCodes['POSTPROCRUNNING']])
410 
411  if nDefined:
412  # Task entry exists already; only update UPDATED, NJOBS, STATUS, ONDISK and possibly ATLREL
413  task = self.getTaskDict(dsName,taskName)
414  if task['TEMPLATE']!=template:
415  print ('ERROR: Must not update task with different template: DSNAME = %s, TASKNAME = %s, templates = %s vs %s' % (dsName,taskName,task['TEMPLATE'],template))
416  else:
417  updateStr = ['update TASKS set UPDATED =',DbParam(tstamp),
418  ', NJOBS = ',DbParam(task['NJOBS']+njobs),
419  ', ONDISK = ',DbParam(onDisk)]
420 
421  if release not in task['ATLREL']:
422  print ('WARNING: Updating task using different release: DSNAME = %s, TASKNAME = %s, release = = %s vs %s' % (dsName,taskName,task['ATLREL'],release))
423  release = '; '.join([task['ATLREL'],release])
424  updateStr += [', ATLREL = ',DbParam(release)]
425 
426  if updateStatus:
427  updateStr += [', STATUS = ',DbParam(status)]
428 
429  updateStr += ['where DSNAME = ',DbParam(dsName),'and TASKNAME = ',DbParam(taskName)]
430 
431  self.execute( updateStr)
432 
433  else:
434  # New task entry
435  if not createdTime:
436  createdTime = tstamp
437  if not createdUser:
438  createdUser = getUserName()
439  if not createdHost:
440  createdHost = os.uname()[1]
441  runnr = getRunFromName(dsName,None,True)
442  self.execute( ['insert into TASKS (DSNAME,RUNNR,TASKNAME,TEMPLATE,TASKPOSTPROCSTEPS,ATLREL,CREATED,CREATED_USER,CREATED_HOST,UPDATED,STATUS,ONDISK,NJOBS,TASKCOMMENT) values (',
443  DbParam(dsName),',',
444  DbParam(runnr),',',
445  DbParam(taskName),',',
446  DbParam(template),',',
447  DbParam(taskpostprocsteps),',',
448  DbParam(release),',',
449  DbParam(createdTime),',',
450  DbParam(createdUser),',',
451  DbParam(createdHost),',',
452  DbParam(tstamp),',',
453  DbParam(status),',',
454  DbParam(onDisk),',',
455  DbParam(njobs),',',
456  DbParam(comment),')'],
457  True)
458 
459 

◆ deleteTask()

def python.TaskManager.TaskManager.deleteTask (   self,
  dsName,
  taskName 
)
Delete a task entry from the taskmanager database.

Definition at line 390 of file TaskManager.py.

390  def deleteTask(self,dsName,taskName):
391  """Delete a task entry from the taskmanager database."""
392  q = [ 'delete from TASKS where DSNAME =',DbParam(dsName),'and TASKNAME =',DbParam(taskName) ]
393  cursor = self.execute(q,True)
394  return cursor.rowcount
395 
396 

◆ execute()

def python.TaskManager.TaskManager.execute (   self,
  statementParts,
  doCommit = False,
  limit = None 
)
Execute a SQL statement passed as a list or tuple of statement parts, where each
   part is either a partial SQL string, or an object of type DbParam specifying a parameter
   for the SQL statement. The actual SQL statement is assembled (using the parameter style
   of the current database) and executed, and the resulting cursor is returned.
   Loosely follows the method discussed in the Python Cookbook.
   WARNING: At present, limit doesn't work when ordering rows for Oracle!

Definition at line 313 of file TaskManager.py.

313  def execute(self,statementParts, doCommit=False, limit=None):
314  """Execute a SQL statement passed as a list or tuple of statement parts, where each
315  part is either a partial SQL string, or an object of type DbParam specifying a parameter
316  for the SQL statement. The actual SQL statement is assembled (using the parameter style
317  of the current database) and executed, and the resulting cursor is returned.
318  Loosely follows the method discussed in the Python Cookbook.
319  WARNING: At present, limit doesn't work when ordering rows for Oracle!"""
320  if not self.is_managing_context:
321  print ('**WARN** TaskManager will keep the database connection open until it is deleted.')
322  print ('**INFO** TaskManager should generally only be used inside a with statement:')
323  print ('**INFO** with TaskManager(...) as taskman:')
324  print ('**INFO** # do something ...')
325 
326  if not statementParts:
327  return None
328  if isinstance(statementParts,str):
329  raise TypeError ('Must pass list or tuple to TaskManager.execute')
330  for p in statementParts:
331  if not (isinstance(p,DbParam) or isinstance(p,six.string_types)):
332  raise ValueError ('Can only pass SQL string fragments and DbParam objects in list'
333  'to TaskManager.execute, found %s with "%s"' % (type(p),str(p)))
334  sqlParts = None
335  params = None
336 
337  if self.paramstyle=='qmark':
338  sqlParts = []
339  params = []
340  for p in statementParts:
341  if isinstance(p,DbParam):
342  sqlParts.append('?')
343  params.append(p.value)
344  else:
345  sqlParts.append(p)
346 
347  if self.paramstyle=='named':
348  sqlParts = []
349  params = {}
350  for p in statementParts:
351  if isinstance(p,DbParam):
352  name = 'p%d' % len(params)
353  sqlParts.append(':%s' % name)
354  params[name] = p.value
355  else:
356  sqlParts.append(p)
357 
358  if sqlParts is None:
359  raise ValueError ('Unknown SQL parameter style %s' % self.paramstyle)
360  return None
361  sql = ' '.join(sqlParts)
362 
363  # Handle database-dependent limit clause (PRELIMINARY)
364  if limit:
365  if self.dbtype=='oracle':
366  # WARNING: the following doesn't work always when ordering rows!
367  if 'order by' in sql: # WARNING: works only if order by is in lowercase!
368  sqlParts2 = sql.split('order by')
369  sql = '%s and ROWNUM <= %i order by %s' % (sqlParts2[0],limit,sqlParts2[1])
370  else:
371  sql += ' and ROWNUM <= %i' % limit
372  else:
373  sql += ' limit %i' % limit
374 
375  if self.debug:
376  print ('\nExecuting SQL statement: ',sql)
377  print (' with parameters: ',params)
378 
379  cursor = self.dbcon.cursor()
380  try:
381  cursor.execute(sql,params)
382  if doCommit:
383  self.dbcon.commit()
384  except Exception as e:
385  msg = '\nDatabase error executing SQL statement\n %s\nusing parameters\n %s\n%s' % (sql,params,e)
386  raise TaskManagerDatabaseError (msg)
387  return cursor
388 
389 

◆ getCount()

def python.TaskManager.TaskManager.getCount (   self,
  what,
  qual = () 
)

Definition at line 519 of file TaskManager.py.

519  def getCount(self, what, qual=()):
520  # Except when using count(distinct(...)) which is not supported by earlier sqlite versions,
521  # better use getNTasks
522  q = [ 'select %s from TASKS' % what ]
523  q.extend(qual)
524  return len(self.execute(q).fetchall())
525 
526 

◆ getDSNames()

def python.TaskManager.TaskManager.getDSNames (   self,
  dsname 
)
Get all DSNAMEs for a given (partial) data set name dsname.

Definition at line 584 of file TaskManager.py.

584  def getDSNames(self,dsname):
585  """Get all DSNAMEs for a given (partial) data set name dsname."""
586  #q = [ "select distinct(DSNAME) from TASKS where DSNAME like '%%%s%%' order by UPDATED" % dsname ]
587  q = [ "select distinct(DSNAME) from TASKS where DSNAME like '%%%s%%' order by DSNAME" % dsname ]
588  return self.execute(q).fetchall()
589 
590 

◆ getNTasks()

def python.TaskManager.TaskManager.getNTasks (   self,
  qual = () 
)

Definition at line 527 of file TaskManager.py.

527  def getNTasks(self, qual=()):
528  q = [ 'select count(*) from TASKS' ]
529  q.extend(qual)
530  return self.execute(q).fetchone()[0]
531 

◆ getStatus()

def python.TaskManager.TaskManager.getStatus (   self,
  dsName,
  taskName 
)
Get status of a task.

Definition at line 460 of file TaskManager.py.

460  def getStatus(self,dsName,taskName):
461  """Get status of a task."""
462  cursor = self.execute( ['select STATUS from TASKS where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName) ] )
463  return cursor.fetchone()[0]
464 
465 

◆ getTaskDict()

def python.TaskManager.TaskManager.getTaskDict (   self,
  dsname,
  taskname,
  what = '*',
  qual = () 
)

Definition at line 563 of file TaskManager.py.

563  def getTaskDict(self, dsname, taskname, what='*', qual=()):
564  # NOTE: We could implement this more elegantly using row_factory (sqlite3)
565  # or rowfactory (Oracle), but this would come at the expense of a
566  # direct database dependence.
567  if qual:
568  q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME =', DbParam(taskname) ]
569  q.extend(qual)
570  else:
571  if self.dbtype=='oracle':
572  # WARNING: DOES THIS ALWAYS WORK? There anyway should only be a single task here?
573  q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME = ', DbParam(taskname), 'and ROWNUM <= 1 order by UPDATED' ]
574  else:
575  q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME = ', DbParam(taskname), 'order by UPDATED limit 1' ]
576  cursor = self.execute(q)
577  row = cursor.fetchone()
578  if row is None:
579  return {}
580  else:
581  return dictFactory(cursor,row)
582 
583 

◆ getTaskNames()

def python.TaskManager.TaskManager.getTaskNames (   self,
  dsname,
  taskname = None,
  addWildCards = True 
)
Find all matching tasks and return list of full (DSNAME,TASKNAME) pairs. dsname and taskname
   may be any partial dataset and task name, respectively.

Definition at line 591 of file TaskManager.py.

591  def getTaskNames(self,dsname,taskname=None,addWildCards=True):
592  """Find all matching tasks and return list of full (DSNAME,TASKNAME) pairs. dsname and taskname
593  may be any partial dataset and task name, respectively."""
594  if addWildCards:
595  if taskname:
596  q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' and TASKNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname,taskname) ]
597  else:
598  q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname) ]
599  else:
600  if taskname:
601  q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%s' and TASKNAME like '%s' order by DSNAME,TASKNAME" % (dsname,taskname) ]
602  else:
603  q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%s' order by DSNAME,TASKNAME" % (dsname) ]
604  return self.execute(q).fetchall()
605 
606 

◆ getTaskValue()

def python.TaskManager.TaskManager.getTaskValue (   self,
  dsName,
  taskName,
  what 
)
Get a single value from the task database. If the query results in more than one value, only
   the first value is returned.

Definition at line 532 of file TaskManager.py.

532  def getTaskValue(self, dsName,taskName, what):
533  """Get a single value from the task database. If the query results in more than one value, only
534  the first value is returned."""
535  q = [ 'select %s from TASKS where DSNAME =' % what , DbParam(dsName), 'and TASKNAME =', DbParam(taskName)]
536  return self.execute(q).fetchone()[0]
537 

◆ getValue()

def python.TaskManager.TaskManager.getValue (   self,
  what,
  qual = () 
)
Get a single value from the task database. If the query results in more than one value, only
   the first value is returned.

Definition at line 511 of file TaskManager.py.

511  def getValue(self, what, qual=()):
512  """Get a single value from the task database. If the query results in more than one value, only
513  the first value is returned."""
514  q = [ 'select %s from TASKS' % what ]
515  q.extend(qual)
516  return self.execute(q).fetchone()[0]
517 
518 

◆ parseConnectionInfo()

def python.TaskManager.TaskManager.parseConnectionInfo (   self,
  connstring = '' 
)

Definition at line 146 of file TaskManager.py.

146  def parseConnectionInfo(self, connstring=''):
147  if not connstring:
148  connstring = os.environ.get('TASKDB', 'sqlite_file:taskdata.db')
149 
150  try:
151  dbtype, dbname = connstring.split(':', 1)
152  except Exception:
153  raise ValueError ('Illegal database connection string {}'.format(connstring))
154 
155  if dbtype == 'auth_file':
156  # dbname is a file with the actual connection information
157  authfile = dbname
158  try:
159  with open(authfile) as af:
160  connstring = af.read().strip()
161  dbtype, dbname = connstring.split(':', 1)
162  except Exception:
163  raise ValueError ('Invalid authorization file {} (not readable or invalid format)'.format(authfile))
164 
165  return dbtype, dbname
166 
167 

◆ setDiskStatus()

def python.TaskManager.TaskManager.setDiskStatus (   self,
  dsName,
  taskName,
  onDisk 
)
Update the onDisk-status of a task.

Definition at line 478 of file TaskManager.py.

478  def setDiskStatus(self,dsName,taskName,onDisk):
479  """Update the onDisk-status of a task."""
480  tstamp = time.time()
481  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', ONDISK =',DbParam(onDisk),
482  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
483  return cursor.rowcount
484 
485 

◆ setStatus()

def python.TaskManager.TaskManager.setStatus (   self,
  dsName,
  taskName,
  status,
  oldStatus = None 
)
Set the status of a task. If oldStatus is set, the task must be in the designated status.

Definition at line 466 of file TaskManager.py.

466  def setStatus(self,dsName,taskName,status,oldStatus=None):
467  """Set the status of a task. If oldStatus is set, the task must be in the designated status."""
468  tstamp = time.time()
469  if oldStatus:
470  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', STATUS =',DbParam(status),
471  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName),'and STATUS =',DbParam(oldStatus)], True)
472  else:
473  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', STATUS =',DbParam(status),
474  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
475  return cursor.rowcount
476 
477 

◆ setValue()

def python.TaskManager.TaskManager.setValue (   self,
  dsName,
  taskName,
  fieldName,
  value 
)
Update a field of a given task.

Definition at line 503 of file TaskManager.py.

503  def setValue(self,dsName,taskName,fieldName,value):
504  """Update a field of a given task."""
505  tstamp = time.time()
506  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', %s =' % fieldName,DbParam(value),
507  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
508  return cursor.rowcount
509 
510 

◆ taskIter()

def python.TaskManager.TaskManager.taskIter (   self,
  what = '*',
  qual = ('order by UPDATED',) 
)

Definition at line 538 of file TaskManager.py.

538  def taskIter(self, what='*', qual=('order by UPDATED',)):
539  q = [ 'select %s from TASKS' % what ]
540  q.extend(qual)
541  cursor = self.execute(q)
542  return iter(cursor.fetchone,None)
543 
544 

◆ taskIterDict()

def python.TaskManager.TaskManager.taskIterDict (   self,
  what = '*',
  qual = ('order by UPDATED',),
  limit = 999999999 
)

Definition at line 545 of file TaskManager.py.

545  def taskIterDict(self, what='*', qual=('order by UPDATED',), limit=999999999):
546  # NOTE: We could implement this more elegantly using row_factory (sqlite3)
547  # or rowfactory (Oracle), but this would come at the expense of a
548  # direct database dependence. Also, the primitive limit on the
549  # number of items returned by the generator avoids database-
550  # dependent SQL limit statements (until limit in TaskManager.execute
551  # is fully implemented also for Oracle.
552  q = [ 'select %s from TASKS' % what ]
553  q.extend(qual)
554  cursor = self.execute(q)
555  n = 0
556  while n<limit:
557  row = cursor.fetchone()
558  if row is None: break
559  n += 1
560  yield dictFactory(cursor,row)
561 
562 

◆ updateStatus()

def python.TaskManager.TaskManager.updateStatus (   self,
  dsName,
  taskName,
  status,
  nResultFiles,
  nJobs,
  nJobsSubmitted,
  nJobsRunning,
  nJobsPostProc,
  nJobsFailed,
  nJobsCompleted 
)
Update task status information including number of jobs in different states.

Definition at line 486 of file TaskManager.py.

486  def updateStatus(self,dsName,taskName,
487  status,nResultFiles,nJobs,nJobsSubmitted,nJobsRunning,nJobsPostProc,nJobsFailed,nJobsCompleted):
488  """Update task status information including number of jobs in different states."""
489  tstamp = time.time()
490  cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),
491  ', STATUS =',DbParam(status),
492  ', NRESULTFILES =',DbParam(nResultFiles),
493  ', NJOBS =',DbParam(nJobs),
494  ', NJOBS_SUBMITTED =',DbParam(nJobsSubmitted),
495  ', NJOBS_RUNNING =',DbParam(nJobsRunning),
496  ', NJOBS_POSTPROC =',DbParam(nJobsPostProc),
497  ', NJOBS_FAILED =',DbParam(nJobsFailed),
498  ', NJOBS_COMPLETED =',DbParam(nJobsCompleted),
499  'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
500  return cursor.rowcount
501 
502 

Member Data Documentation

◆ dbcon

python.TaskManager.TaskManager.dbcon

Definition at line 189 of file TaskManager.py.

◆ dbname

python.TaskManager.TaskManager.dbname

Definition at line 179 of file TaskManager.py.

◆ dbtype

python.TaskManager.TaskManager.dbtype

Definition at line 181 of file TaskManager.py.

◆ debug

python.TaskManager.TaskManager.debug

Definition at line 175 of file TaskManager.py.

◆ is_managing_context

python.TaskManager.TaskManager.is_managing_context

Definition at line 177 of file TaskManager.py.

◆ OnDiskCodes

dictionary python.TaskManager.TaskManager.OnDiskCodes
static
Initial value:
= { 'UNKNOWN': -1,
'ALLONDISK': 0,
'RESULTSONDISK': 1,
# Codes above are for jobs whose task directory is on disk
'ARCHIVED': 10,
'DELETED': 11 }

Definition at line 138 of file TaskManager.py.

◆ paramstyle

python.TaskManager.TaskManager.paramstyle

Definition at line 176 of file TaskManager.py.

◆ StatusCodes

dictionary python.TaskManager.TaskManager.StatusCodes
static
Initial value:
= { 'UNKNOWN': -1,
'CONFIGURED': 0,
'SUBMITTED': 1,
'RUNNING': 2,
'PARTIALFAILED': 3,
# Codes above are considered tasks whose status needs to
# be monitored and updated
'POSTPROCESSING': 5,
'POSTPROCRUNNING': 6,
'POSTPROCFAILED': 7,
'COMPLETED': 10,
'FAILED': 11}

Definition at line 125 of file TaskManager.py.


The documentation for this class was generated from the following file:
vtune_athena.format
format
Definition: vtune_athena.py:14
python.Utils.getRunFromName
def getRunFromName(name, default='', asInt=False)
Definition: InnerDetector/InDetExample/InDetBeamSpotExample/python/Utils.py:13
LArG4FSStartPointFilterLegacy.execute
execute
Definition: LArG4FSStartPointFilterLegacy.py:20
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.processes.powheg.ZZ.ZZ.__init__
def __init__(self, base_directory, **kwargs)
Constructor: all process options are set here.
Definition: ZZ.py:18
Trk::open
@ open
Definition: BinningType.h:40
query_example.cursor
cursor
Definition: query_example.py:21
python.CaloScaleNoiseConfig.type
type
Definition: CaloScaleNoiseConfig.py:78
str
Definition: BTagTrackIpAccessor.cxx:11
calibdata.commit
bool commit
Definition: calibdata.py:832
python.TaskManager.dictFactory
def dictFactory(cursor, row)
Definition: TaskManager.py:35
python.Utils.getUserName
def getUserName(default='UNKNOWN')
Definition: InnerDetector/InDetExample/InDetBeamSpotExample/python/Utils.py:48