ATLAS Offline Software
Loading...
Searching...
No Matches
python.TaskManager.TaskManager Class Reference
Collaboration diagram for python.TaskManager.TaskManager:

Public Member Functions

 parseConnectionInfo (self, connstring='')
 __init__ (self, connstring='', createDatabase=False)
 __enter__ (self)
 __exit__ (self, exc_type, exc_value, traceback)
 execute (self, statementParts, doCommit=False, limit=None)
 deleteTask (self, dsName, taskName)
 addTask (self, dsName, taskName, template, release, njobs, taskpostprocsteps='', status=StatusCodes['SUBMITTED'], onDisk=OnDiskCodes['ALLONDISK'], createdTime=None, createdUser=None, createdHost=None, comment='')
 getStatus (self, dsName, taskName)
 setStatus (self, dsName, taskName, status, oldStatus=None)
 setDiskStatus (self, dsName, taskName, onDisk)
 updateStatus (self, dsName, taskName, status, nResultFiles, nJobs, nJobsSubmitted, nJobsRunning, nJobsPostProc, nJobsFailed, nJobsCompleted)
 setValue (self, dsName, taskName, fieldName, value)
 getValue (self, what, qual=())
 getCount (self, what, qual=())
 getNTasks (self, qual=())
 getTaskValue (self, dsName, taskName, what)
 taskIter (self, what=' *', qual=('order by UPDATED',))
 taskIterDict (self, what=' *', qual=('order by UPDATED',), limit=999999999)
 getTaskDict (self, dsname, taskname, what=' *', qual=())
 getDSNames (self, dsname)
 getTaskNames (self, dsname, taskname=None, addWildCards=True)

Public Attributes

bool debug = False
str paramstyle = None
bool is_managing_context = False
str dbtype
 dbname = self.__class__.parseConnectionInfo(connstring)
 dbcon = sqlite3.connect(self.dbname)

Static Public Attributes

dict StatusCodes
dict OnDiskCodes

Protected Member Functions

 _createSQLiteSchema (self)
 _createOracleSchema (self)

Detailed Description

TaskManager is tool for keeping track of JobRunner jobs.

Definition at line 120 of file TaskManager.py.

Constructor & Destructor Documentation

◆ __init__()

python.TaskManager.TaskManager.__init__ ( self,
connstring = '',
createDatabase = False )
Constructor. connstring is a connection string specifying either a SQLite file
   ("sqlite_file:..."), an Oracle database ("oracle://..."), or an authorization file
   ("auth_file:...") with connection information. If connstring is empty, the connection
   information will be taken from TASKDB (if set), or otherwise defaults to
   'sqlite_file:taskdata.db'.

Definition at line 166 of file TaskManager.py.

166 def __init__(self, connstring='', createDatabase=False):
167 """Constructor. connstring is a connection string specifying either a SQLite file
168 ("sqlite_file:..."), an Oracle database ("oracle://..."), or an authorization file
169 ("auth_file:...") with connection information. If connstring is empty, the connection
170 information will be taken from TASKDB (if set), or otherwise defaults to
171 'sqlite_file:taskdata.db'."""
172
173 self.debug = False
174 self.paramstyle = None
175 self.is_managing_context = False
176
177 self.dbtype, self.dbname = self.__class__.parseConnectionInfo(connstring)
178
179 if self.dbtype == 'sqlite_file':
180 import sqlite3
181 self.paramstyle = 'qmark'
182 dbexists = os.access(self.dbname, os.F_OK)
183 if dbexists and createDatabase:
184 raise ValueError ('SQLite file {} exists already - remove before recreating'.format(self.dbname))
185 if not (dbexists or createDatabase):
186 raise ValueError ('TaskManager database not found (SQLite file {})'.format(self.dbname))
187 self.dbcon = sqlite3.connect(self.dbname)
188 if createDatabase:
189 self._createSQLiteSchema()
190 elif self.dbtype == 'oracle':
191 import cx_Oracle
192 self.paramstyle = 'named'
193 try:
194 self.dbcon = cx_Oracle.connect(self.dbname)
195 except Exception:
196 print ('ERROR: First connection attempt to Beam Spot Oracle database failed; will retry in 10s ...')
197 time.sleep(10)
198 self.dbcon = cx_Oracle.connect(self.dbname)
199 if createDatabase:
200 self._createOracleSchema()
201 else:
202 raise ValueError ('Unknown database type: {}'.format(self.dbtype))
203

Member Function Documentation

◆ __enter__()

python.TaskManager.TaskManager.__enter__ ( self)
Remember that we're inside a 'with' statement so we can warn otherwise: 

Definition at line 204 of file TaskManager.py.

204 def __enter__(self):
205 ''' Remember that we're inside a 'with' statement so we can warn otherwise: '''
206 self.is_managing_context = True
207 return self
208

◆ __exit__()

python.TaskManager.TaskManager.__exit__ ( self,
exc_type,
exc_value,
traceback )
Close the database connection at the end of the 'with' statement. 

Definition at line 209 of file TaskManager.py.

209 def __exit__(self, exc_type, exc_value, traceback):
210 ''' Close the database connection at the end of the 'with' statement. '''
211 try:
212 self.dbcon.close()
213 except Exception:
214 print ('ERROR: Unable to close database connection')
215

◆ _createOracleSchema()

python.TaskManager.TaskManager._createOracleSchema ( self)
protected
Create the database schema for an Oracle database.

Definition at line 253 of file TaskManager.py.

253 def _createOracleSchema(self):
254 """Create the database schema for an Oracle database."""
255 try:
256 self.dbcon.cursor().execute('drop sequence TASKS_SEQ')
257 self.dbcon.cursor().execute('drop trigger TASKS_ID_TRIGGER')
258 self.dbcon.cursor().execute('drop table TASKS')
259 except Exception as e:
260 print (e)
261 self.dbcon.cursor().execute("""
262 create table tasks (
263 TASKID number(10,0),
264 DSNAME varchar2(256),
265 RUNNR number(10,0),
266 TASKNAME varchar2(256),
267 TEMPLATE varchar2(256),
268 TASKPOSTPROCSTEPS varchar2(256),
269 ATLREL varchar2(256),
270 CREATED number,
271 CREATED_USER varchar2(256),
272 CREATED_HOST varchar2(256),
273 UPDATED number,
274 STATUS number(5,0),
275 ONDISK number(5,0),
276 LOGFILES varchar2(256),
277 COOLTAGS varchar2(256),
278 NRESULTFILES number(10,0) default 0,
279 NJOBS number(10,0) default 0,
280 NJOBS_SUBMITTED number(10,0) default -1,
281 NJOBS_RUNNING number(10,0) default -1,
282 NJOBS_POSTPROC number(10,0) default -1,
283 NJOBS_FAILED number(10,0) default -1,
284 NJOBS_COMPLETED number(10,0) default -1,
285 RESULTFILES varchar2(4000),
286 RESULTLINKS varchar2(4000),
287 TASKCOMMENT varchar2(4000),
288 STATUS_POSTPROC number(5,0) default -1,
289 BASEPATH varchar2(256),
290 AUXDATA varchar2(4000),
291 constraint BEAMSPOT_TASKS_PK primary key (TASKID)
292 )
293 """)
294 self.dbcon.cursor().execute('create index TASKS_DSNAME_INDX on TASKS(DSNAME)')
295 self.dbcon.cursor().execute('create index TASKS_RUNNR_INDX on TASKS(RUNNR)')
296 self.dbcon.cursor().execute('alter table TASKS add constraint TASKS_TNAME_DSNAME_UQ unique (TASKNAME, DSNAME)')
297 self.dbcon.cursor().execute('create sequence TASKS_SEQ start with 1 increment by 1 nomaxvalue')
298 # The following statement must not have whitespace at the beginning of the line, otherwise
299 # the trigger function won't be parsed correctly:
300 self.dbcon.cursor().execute("""
301create or replace trigger TASKS_ID_TRIGGER
302before insert on TASKS
303for each row
304begin
305 select TASKS_SEQ.nextval into :new.TASKID from dual;
306end;
307""")
308 self.dbcon.commit()
309
310

◆ _createSQLiteSchema()

python.TaskManager.TaskManager._createSQLiteSchema ( self)
protected
Create the database schema for a SQLite3 database.

Definition at line 216 of file TaskManager.py.

216 def _createSQLiteSchema(self):
217 """Create the database schema for a SQLite3 database."""
218 self.dbcon.cursor().executescript("""
219 create table tasks (
220 TASKID integer primary key autoincrement,
221 DSNAME text,
222 RUNNR integer,
223 TASKNAME text,
224 TEMPLATE text,
225 TASKPOSTPROCSTEPS text,
226 ATLREL text,
227 CREATED real,
228 CREATED_USER text,
229 CREATED_HOST text,
230 UPDATED real,
231 STATUS integer,
232 ONDISK integer,
233 LOGFILES text,
234 COOLTAGS text,
235 NRESULTFILES integer default 0,
236 NJOBS integer default 0,
237 NJOBS_SUBMITTED integer default -1,
238 NJOBS_RUNNING integer default -1,
239 NJOBS_POSTPROC integer default -1,
240 NJOBS_FAILED integer default -1,
241 NJOBS_COMPLETED integer default -1,
242 RESULTFILES text,
243 RESULTLINKS text,
244 TASKCOMMENT text,
245 STATUS_POSTPROC integer,
246 BASEPATH text,
247 AUXDATA text
248 );
249 """)
250 self.dbcon.commit()
251
252

◆ addTask()

python.TaskManager.TaskManager.addTask ( self,
dsName,
taskName,
template,
release,
njobs,
taskpostprocsteps = '',
status = StatusCodes['SUBMITTED'],
onDisk = OnDiskCodes['ALLONDISK'],
createdTime = None,
createdUser = None,
createdHost = None,
comment = '' )
Add an entry for a new task if the task doesn't exist already. If the task exists,
   its UPDATED, NJOBS, STATUS and ONDISK fields will be updated.

Definition at line 395 of file TaskManager.py.

402 comment=''):
403 """Add an entry for a new task if the task doesn't exist already. If the task exists,
404 its UPDATED, NJOBS, STATUS and ONDISK fields will be updated."""
405 tstamp = time.time()
406 nDefined = self.getNTasks(['where DSNAME =',DbParam(dsName),'and TASKNAME=',DbParam(taskName)])
407 updateStatus = self.getNTasks(['where DSNAME = ',DbParam(dsName),'and TASKNAME=',DbParam(taskName),'and STATUS < %i' % TaskManager.StatusCodes['POSTPROCRUNNING']])
408
409 if nDefined:
410 # Task entry exists already; only update UPDATED, NJOBS, STATUS, ONDISK and possibly ATLREL
411 task = self.getTaskDict(dsName,taskName)
412 if task['TEMPLATE']!=template:
413 print ('ERROR: Must not update task with different template: DSNAME = %s, TASKNAME = %s, templates = %s vs %s' % (dsName,taskName,task['TEMPLATE'],template))
414 else:
415 updateStr = ['update TASKS set UPDATED =',DbParam(tstamp),
416 ', NJOBS = ',DbParam(task['NJOBS']+njobs),
417 ', ONDISK = ',DbParam(onDisk)]
418
419 if release not in task['ATLREL']:
420 print ('WARNING: Updating task using different release: DSNAME = %s, TASKNAME = %s, release = = %s vs %s' % (dsName,taskName,task['ATLREL'],release))
421 release = '; '.join([task['ATLREL'],release])
422 updateStr += [', ATLREL = ',DbParam(release)]
423
424 if updateStatus:
425 updateStr += [', STATUS = ',DbParam(status)]
426
427 updateStr += ['where DSNAME = ',DbParam(dsName),'and TASKNAME = ',DbParam(taskName)]
428
429 self.execute( updateStr)
430
431 else:
432 # New task entry
433 if not createdTime:
434 createdTime = tstamp
435 if not createdUser:
436 createdUser = getUserName()
437 if not createdHost:
438 createdHost = os.uname()[1]
439 runnr = getRunFromName(dsName,None,True)
440 self.execute( ['insert into TASKS (DSNAME,RUNNR,TASKNAME,TEMPLATE,TASKPOSTPROCSTEPS,ATLREL,CREATED,CREATED_USER,CREATED_HOST,UPDATED,STATUS,ONDISK,NJOBS,TASKCOMMENT) values (',
441 DbParam(dsName),',',
442 DbParam(runnr),',',
443 DbParam(taskName),',',
444 DbParam(template),',',
445 DbParam(taskpostprocsteps),',',
446 DbParam(release),',',
447 DbParam(createdTime),',',
448 DbParam(createdUser),',',
449 DbParam(createdHost),',',
450 DbParam(tstamp),',',
451 DbParam(status),',',
452 DbParam(onDisk),',',
453 DbParam(njobs),',',
454 DbParam(comment),')'],
455 True)
456
457

◆ deleteTask()

python.TaskManager.TaskManager.deleteTask ( self,
dsName,
taskName )
Delete a task entry from the taskmanager database.

Definition at line 388 of file TaskManager.py.

388 def deleteTask(self,dsName,taskName):
389 """Delete a task entry from the taskmanager database."""
390 q = [ 'delete from TASKS where DSNAME =',DbParam(dsName),'and TASKNAME =',DbParam(taskName) ]
391 cursor = self.execute(q,True)
392 return cursor.rowcount
393
394

◆ execute()

python.TaskManager.TaskManager.execute ( self,
statementParts,
doCommit = False,
limit = None )
Execute a SQL statement passed as a list or tuple of statement parts, where each
   part is either a partial SQL string, or an object of type DbParam specifying a parameter
   for the SQL statement. The actual SQL statement is assembled (using the parameter style
   of the current database) and executed, and the resulting cursor is returned.
   Loosely follows the method discussed in the Python Cookbook.
   WARNING: At present, limit doesn't work when ordering rows for Oracle!

Definition at line 311 of file TaskManager.py.

311 def execute(self,statementParts, doCommit=False, limit=None):
312 """Execute a SQL statement passed as a list or tuple of statement parts, where each
313 part is either a partial SQL string, or an object of type DbParam specifying a parameter
314 for the SQL statement. The actual SQL statement is assembled (using the parameter style
315 of the current database) and executed, and the resulting cursor is returned.
316 Loosely follows the method discussed in the Python Cookbook.
317 WARNING: At present, limit doesn't work when ordering rows for Oracle!"""
318 if not self.is_managing_context:
319 print ('**WARN** TaskManager will keep the database connection open until it is deleted.')
320 print ('**INFO** TaskManager should generally only be used inside a with statement:')
321 print ('**INFO** with TaskManager(...) as taskman:')
322 print ('**INFO** # do something ...')
323
324 if not statementParts:
325 return None
326 if isinstance(statementParts,str):
327 raise TypeError ('Must pass list or tuple to TaskManager.execute')
328 for p in statementParts:
329 if not (isinstance(p,DbParam) or isinstance(p,str)):
330 raise ValueError ('Can only pass SQL string fragments and DbParam objects in list'
331 'to TaskManager.execute, found %s with "%s"' % (type(p),str(p)))
332 sqlParts = None
333 params = None
334
335 if self.paramstyle=='qmark':
336 sqlParts = []
337 params = []
338 for p in statementParts:
339 if isinstance(p,DbParam):
340 sqlParts.append('?')
341 params.append(p.value)
342 else:
343 sqlParts.append(p)
344
345 if self.paramstyle=='named':
346 sqlParts = []
347 params = {}
348 for p in statementParts:
349 if isinstance(p,DbParam):
350 name = 'p%d' % len(params)
351 sqlParts.append(':%s' % name)
352 params[name] = p.value
353 else:
354 sqlParts.append(p)
355
356 if sqlParts is None:
357 raise ValueError ('Unknown SQL parameter style %s' % self.paramstyle)
358 return None
359 sql = ' '.join(sqlParts)
360
361 # Handle database-dependent limit clause (PRELIMINARY)
362 if limit:
363 if self.dbtype=='oracle':
364 # WARNING: the following doesn't work always when ordering rows!
365 if 'order by' in sql: # WARNING: works only if order by is in lowercase!
366 sqlParts2 = sql.split('order by')
367 sql = '%s and ROWNUM <= %i order by %s' % (sqlParts2[0],limit,sqlParts2[1])
368 else:
369 sql += ' and ROWNUM <= %i' % limit
370 else:
371 sql += ' limit %i' % limit
372
373 if self.debug:
374 print ('\nExecuting SQL statement: ',sql)
375 print (' with parameters: ',params)
376
377 cursor = self.dbcon.cursor()
378 try:
379 cursor.execute(sql,params)
380 if doCommit:
381 self.dbcon.commit()
382 except Exception as e:
383 msg = '\nDatabase error executing SQL statement\n %s\nusing parameters\n %s\n%s' % (sql,params,e)
384 raise TaskManagerDatabaseError (msg)
385 return cursor
386
387

◆ getCount()

python.TaskManager.TaskManager.getCount ( self,
what,
qual = () )

Definition at line 517 of file TaskManager.py.

517 def getCount(self, what, qual=()):
518 # Except when using count(distinct(...)) which is not supported by earlier sqlite versions,
519 # better use getNTasks
520 q = [ 'select %s from TASKS' % what ]
521 q.extend(qual)
522 return len(self.execute(q).fetchall())
523
524

◆ getDSNames()

python.TaskManager.TaskManager.getDSNames ( self,
dsname )
Get all DSNAMEs for a given (partial) data set name dsname.

Definition at line 582 of file TaskManager.py.

582 def getDSNames(self,dsname):
583 """Get all DSNAMEs for a given (partial) data set name dsname."""
584 #q = [ "select distinct(DSNAME) from TASKS where DSNAME like '%%%s%%' order by UPDATED" % dsname ]
585 q = [ "select distinct(DSNAME) from TASKS where DSNAME like '%%%s%%' order by DSNAME" % dsname ]
586 return self.execute(q).fetchall()
587
588

◆ getNTasks()

python.TaskManager.TaskManager.getNTasks ( self,
qual = () )

Definition at line 525 of file TaskManager.py.

525 def getNTasks(self, qual=()):
526 q = [ 'select count(*) from TASKS' ]
527 q.extend(qual)
528 return self.execute(q).fetchone()[0]
529

◆ getStatus()

python.TaskManager.TaskManager.getStatus ( self,
dsName,
taskName )
Get status of a task.

Definition at line 458 of file TaskManager.py.

458 def getStatus(self,dsName,taskName):
459 """Get status of a task."""
460 cursor = self.execute( ['select STATUS from TASKS where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName) ] )
461 return cursor.fetchone()[0]
462
463

◆ getTaskDict()

python.TaskManager.TaskManager.getTaskDict ( self,
dsname,
taskname,
what = '*',
qual = () )

Definition at line 561 of file TaskManager.py.

561 def getTaskDict(self, dsname, taskname, what='*', qual=()):
562 # NOTE: We could implement this more elegantly using row_factory (sqlite3)
563 # or rowfactory (Oracle), but this would come at the expense of a
564 # direct database dependence.
565 if qual:
566 q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME =', DbParam(taskname) ]
567 q.extend(qual)
568 else:
569 if self.dbtype=='oracle':
570 # WARNING: DOES THIS ALWAYS WORK? There anyway should only be a single task here?
571 q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME = ', DbParam(taskname), 'and ROWNUM <= 1 order by UPDATED' ]
572 else:
573 q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME = ', DbParam(taskname), 'order by UPDATED limit 1' ]
574 cursor = self.execute(q)
575 row = cursor.fetchone()
576 if row is None:
577 return {}
578 else:
579 return dictFactory(cursor,row)
580
581

◆ getTaskNames()

python.TaskManager.TaskManager.getTaskNames ( self,
dsname,
taskname = None,
addWildCards = True )
Find all matching tasks and return list of full (DSNAME,TASKNAME) pairs. dsname and taskname
   may be any partial dataset and task name, respectively.

Definition at line 589 of file TaskManager.py.

589 def getTaskNames(self,dsname,taskname=None,addWildCards=True):
590 """Find all matching tasks and return list of full (DSNAME,TASKNAME) pairs. dsname and taskname
591 may be any partial dataset and task name, respectively."""
592 if addWildCards:
593 if taskname:
594 q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' and TASKNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname,taskname) ]
595 else:
596 q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname) ]
597 else:
598 if taskname:
599 q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%s' and TASKNAME like '%s' order by DSNAME,TASKNAME" % (dsname,taskname) ]
600 else:
601 q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%s' order by DSNAME,TASKNAME" % (dsname) ]
602 return self.execute(q).fetchall()
603
604

◆ getTaskValue()

python.TaskManager.TaskManager.getTaskValue ( self,
dsName,
taskName,
what )
Get a single value from the task database. If the query results in more than one value, only
   the first value is returned.

Definition at line 530 of file TaskManager.py.

530 def getTaskValue(self, dsName,taskName, what):
531 """Get a single value from the task database. If the query results in more than one value, only
532 the first value is returned."""
533 q = [ 'select %s from TASKS where DSNAME =' % what , DbParam(dsName), 'and TASKNAME =', DbParam(taskName)]
534 return self.execute(q).fetchone()[0]
535

◆ getValue()

python.TaskManager.TaskManager.getValue ( self,
what,
qual = () )
Get a single value from the task database. If the query results in more than one value, only
   the first value is returned.

Definition at line 509 of file TaskManager.py.

509 def getValue(self, what, qual=()):
510 """Get a single value from the task database. If the query results in more than one value, only
511 the first value is returned."""
512 q = [ 'select %s from TASKS' % what ]
513 q.extend(qual)
514 return self.execute(q).fetchone()[0]
515
516

◆ parseConnectionInfo()

python.TaskManager.TaskManager.parseConnectionInfo ( self,
connstring = '' )

Definition at line 144 of file TaskManager.py.

144 def parseConnectionInfo(self, connstring=''):
145 if not connstring:
146 connstring = os.environ.get('TASKDB', 'sqlite_file:taskdata.db')
147
148 try:
149 dbtype, dbname = connstring.split(':', 1)
150 except Exception:
151 raise ValueError ('Illegal database connection string {}'.format(connstring))
152
153 if dbtype == 'auth_file':
154 # dbname is a file with the actual connection information
155 authfile = dbname
156 try:
157 with open(authfile) as af:
158 connstring = af.read().strip()
159 dbtype, dbname = connstring.split(':', 1)
160 except Exception:
161 raise ValueError ('Invalid authorization file {} (not readable or invalid format)'.format(authfile))
162
163 return dbtype, dbname
164
165

◆ setDiskStatus()

python.TaskManager.TaskManager.setDiskStatus ( self,
dsName,
taskName,
onDisk )
Update the onDisk-status of a task.

Definition at line 476 of file TaskManager.py.

476 def setDiskStatus(self,dsName,taskName,onDisk):
477 """Update the onDisk-status of a task."""
478 tstamp = time.time()
479 cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', ONDISK =',DbParam(onDisk),
480 'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
481 return cursor.rowcount
482
483

◆ setStatus()

python.TaskManager.TaskManager.setStatus ( self,
dsName,
taskName,
status,
oldStatus = None )
Set the status of a task. If oldStatus is set, the task must be in the designated status.

Definition at line 464 of file TaskManager.py.

464 def setStatus(self,dsName,taskName,status,oldStatus=None):
465 """Set the status of a task. If oldStatus is set, the task must be in the designated status."""
466 tstamp = time.time()
467 if oldStatus:
468 cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', STATUS =',DbParam(status),
469 'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName),'and STATUS =',DbParam(oldStatus)], True)
470 else:
471 cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', STATUS =',DbParam(status),
472 'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
473 return cursor.rowcount
474
475

◆ setValue()

python.TaskManager.TaskManager.setValue ( self,
dsName,
taskName,
fieldName,
value )
Update a field of a given task.

Definition at line 501 of file TaskManager.py.

501 def setValue(self,dsName,taskName,fieldName,value):
502 """Update a field of a given task."""
503 tstamp = time.time()
504 cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', %s =' % fieldName,DbParam(value),
505 'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
506 return cursor.rowcount
507
508

◆ taskIter()

python.TaskManager.TaskManager.taskIter ( self,
what = '*',
qual = ('order by UPDATED',) )

Definition at line 536 of file TaskManager.py.

536 def taskIter(self, what='*', qual=('order by UPDATED',)):
537 q = [ 'select %s from TASKS' % what ]
538 q.extend(qual)
539 cursor = self.execute(q)
540 return iter(cursor.fetchone,None)
541
542

◆ taskIterDict()

python.TaskManager.TaskManager.taskIterDict ( self,
what = '*',
qual = ('order by UPDATED',),
limit = 999999999 )

Definition at line 543 of file TaskManager.py.

543 def taskIterDict(self, what='*', qual=('order by UPDATED',), limit=999999999):
544 # NOTE: We could implement this more elegantly using row_factory (sqlite3)
545 # or rowfactory (Oracle), but this would come at the expense of a
546 # direct database dependence. Also, the primitive limit on the
547 # number of items returned by the generator avoids database-
548 # dependent SQL limit statements (until limit in TaskManager.execute
549 # is fully implemented also for Oracle.
550 q = [ 'select %s from TASKS' % what ]
551 q.extend(qual)
552 cursor = self.execute(q)
553 n = 0
554 while n<limit:
555 row = cursor.fetchone()
556 if row is None: break
557 n += 1
558 yield dictFactory(cursor,row)
559
560

◆ updateStatus()

python.TaskManager.TaskManager.updateStatus ( self,
dsName,
taskName,
status,
nResultFiles,
nJobs,
nJobsSubmitted,
nJobsRunning,
nJobsPostProc,
nJobsFailed,
nJobsCompleted )
Update task status information including number of jobs in different states.

Definition at line 484 of file TaskManager.py.

485 status,nResultFiles,nJobs,nJobsSubmitted,nJobsRunning,nJobsPostProc,nJobsFailed,nJobsCompleted):
486 """Update task status information including number of jobs in different states."""
487 tstamp = time.time()
488 cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),
489 ', STATUS =',DbParam(status),
490 ', NRESULTFILES =',DbParam(nResultFiles),
491 ', NJOBS =',DbParam(nJobs),
492 ', NJOBS_SUBMITTED =',DbParam(nJobsSubmitted),
493 ', NJOBS_RUNNING =',DbParam(nJobsRunning),
494 ', NJOBS_POSTPROC =',DbParam(nJobsPostProc),
495 ', NJOBS_FAILED =',DbParam(nJobsFailed),
496 ', NJOBS_COMPLETED =',DbParam(nJobsCompleted),
497 'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
498 return cursor.rowcount
499
500

Member Data Documentation

◆ dbcon

python.TaskManager.TaskManager.dbcon = sqlite3.connect(self.dbname)

Definition at line 187 of file TaskManager.py.

◆ dbname

python.TaskManager.TaskManager.dbname = self.__class__.parseConnectionInfo(connstring)

Definition at line 177 of file TaskManager.py.

◆ dbtype

python.TaskManager.TaskManager.dbtype

Definition at line 177 of file TaskManager.py.

◆ debug

bool python.TaskManager.TaskManager.debug = False

Definition at line 173 of file TaskManager.py.

◆ is_managing_context

bool python.TaskManager.TaskManager.is_managing_context = False

Definition at line 175 of file TaskManager.py.

◆ OnDiskCodes

dict python.TaskManager.TaskManager.OnDiskCodes
static
Initial value:
= { 'UNKNOWN': -1,
'ALLONDISK': 0,
'RESULTSONDISK': 1,
# Codes above are for jobs whose task directory is on disk
'ARCHIVED': 10,
'DELETED': 11 }

Definition at line 136 of file TaskManager.py.

◆ paramstyle

python.TaskManager.TaskManager.paramstyle = None

Definition at line 174 of file TaskManager.py.

◆ StatusCodes

dict python.TaskManager.TaskManager.StatusCodes
static
Initial value:
= { 'UNKNOWN': -1,
'CONFIGURED': 0,
'SUBMITTED': 1,
'RUNNING': 2,
'PARTIALFAILED': 3,
# Codes above are considered tasks whose status needs to
# be monitored and updated
'POSTPROCESSING': 5,
'POSTPROCRUNNING': 6,
'POSTPROCFAILED': 7,
'COMPLETED': 10,
'FAILED': 11}

Definition at line 123 of file TaskManager.py.


The documentation for this class was generated from the following file: