144 def parseConnectionInfo(self, connstring=''):
146 connstring = os.environ.get('TASKDB', 'sqlite_file:taskdata.db')
149 dbtype, dbname = connstring.split(':', 1)
151 raise ValueError ('Illegal database connection string {}'.format(connstring))
153 if dbtype == 'auth_file':
154 # dbname is a file with the actual connection information
157 with open(authfile) as af:
158 connstring = af.read().strip()
159 dbtype, dbname = connstring.split(':', 1)
161 raise ValueError ('Invalid authorization file {} (not readable or invalid format)'.format(authfile))
163 return dbtype, dbname
166 def __init__(self, connstring='', createDatabase=False):
167 """Constructor. connstring is a connection string specifying either a SQLite file
168 ("sqlite_file:..."), an Oracle database ("oracle://..."), or an authorization file
169 ("auth_file:...") with connection information. If connstring is empty, the connection
170 information will be taken from TASKDB (if set), or otherwise defaults to
171 'sqlite_file:taskdata.db'."""
174 self.paramstyle = None
175 self.is_managing_context = False
177 self.dbtype, self.dbname = self.__class__.parseConnectionInfo(connstring)
179 if self.dbtype == 'sqlite_file':
181 self.paramstyle = 'qmark'
182 dbexists = os.access(self.dbname, os.F_OK)
183 if dbexists and createDatabase:
184 raise ValueError ('SQLite file {} exists already - remove before recreating'.format(self.dbname))
185 if not (dbexists or createDatabase):
186 raise ValueError ('TaskManager database not found (SQLite file {})'.format(self.dbname))
187 self.dbcon = sqlite3.connect(self.dbname)
189 self._createSQLiteSchema()
190 elif self.dbtype == 'oracle':
192 self.paramstyle = 'named'
194 self.dbcon = cx_Oracle.connect(self.dbname)
196 print ('ERROR: First connection attempt to Beam Spot Oracle database failed; will retry in 10s ...')
198 self.dbcon = cx_Oracle.connect(self.dbname)
200 self._createOracleSchema()
202 raise ValueError ('Unknown database type: {}'.format(self.dbtype))
216 def _createSQLiteSchema(self):
217 """Create the database schema for a SQLite3 database."""
218 self.dbcon.cursor().executescript("""
220 TASKID integer primary key autoincrement,
225 TASKPOSTPROCSTEPS text,
235 NRESULTFILES integer default 0,
236 NJOBS integer default 0,
237 NJOBS_SUBMITTED integer default -1,
238 NJOBS_RUNNING integer default -1,
239 NJOBS_POSTPROC integer default -1,
240 NJOBS_FAILED integer default -1,
241 NJOBS_COMPLETED integer default -1,
245 STATUS_POSTPROC integer,
253 def _createOracleSchema(self):
254 """Create the database schema for an Oracle database."""
256 self.dbcon.cursor().execute('drop sequence TASKS_SEQ')
257 self.dbcon.cursor().execute('drop trigger TASKS_ID_TRIGGER')
258 self.dbcon.cursor().execute('drop table TASKS')
259 except Exception as e:
261 self.dbcon.cursor().execute("""
264 DSNAME varchar2(256),
266 TASKNAME varchar2(256),
267 TEMPLATE varchar2(256),
268 TASKPOSTPROCSTEPS varchar2(256),
269 ATLREL varchar2(256),
271 CREATED_USER varchar2(256),
272 CREATED_HOST varchar2(256),
276 LOGFILES varchar2(256),
277 COOLTAGS varchar2(256),
278 NRESULTFILES number(10,0) default 0,
279 NJOBS number(10,0) default 0,
280 NJOBS_SUBMITTED number(10,0) default -1,
281 NJOBS_RUNNING number(10,0) default -1,
282 NJOBS_POSTPROC number(10,0) default -1,
283 NJOBS_FAILED number(10,0) default -1,
284 NJOBS_COMPLETED number(10,0) default -1,
285 RESULTFILES varchar2(4000),
286 RESULTLINKS varchar2(4000),
287 TASKCOMMENT varchar2(4000),
288 STATUS_POSTPROC number(5,0) default -1,
289 BASEPATH varchar2(256),
290 AUXDATA varchar2(4000),
291 constraint BEAMSPOT_TASKS_PK primary key (TASKID)
294 self.dbcon.cursor().execute('create index TASKS_DSNAME_INDX on TASKS(DSNAME)')
295 self.dbcon.cursor().execute('create index TASKS_RUNNR_INDX on TASKS(RUNNR)')
296 self.dbcon.cursor().execute('alter table TASKS add constraint TASKS_TNAME_DSNAME_UQ unique (TASKNAME, DSNAME)')
297 self.dbcon.cursor().execute('create sequence TASKS_SEQ start with 1 increment by 1 nomaxvalue')
298 # The following statement must not have whitespace at the beginning of the line, otherwise
299 # the trigger function won't be parsed correctly:
300 self.dbcon.cursor().execute("""
301create or replace trigger TASKS_ID_TRIGGER
302before insert on TASKS
305 select TASKS_SEQ.nextval into :new.TASKID from dual;
311 def execute(self,statementParts, doCommit=False, limit=None):
312 """Execute a SQL statement passed as a list or tuple of statement parts, where each
313 part is either a partial SQL string, or an object of type DbParam specifying a parameter
314 for the SQL statement. The actual SQL statement is assembled (using the parameter style
315 of the current database) and executed, and the resulting cursor is returned.
316 Loosely follows the method discussed in the Python Cookbook.
317 WARNING: At present, limit doesn't work when ordering rows for Oracle!"""
318 if not self.is_managing_context:
319 print ('**WARN** TaskManager will keep the database connection open until it is deleted.')
320 print ('**INFO** TaskManager should generally only be used inside a with statement:')
321 print ('**INFO** with TaskManager(...) as taskman:')
322 print ('**INFO** # do something ...')
324 if not statementParts:
326 if isinstance(statementParts,str):
327 raise TypeError ('Must pass list or tuple to TaskManager.execute')
328 for p in statementParts:
329 if not (isinstance(p,DbParam) or isinstance(p,str)):
330 raise ValueError ('Can only pass SQL string fragments and DbParam objects in list'
331 'to TaskManager.execute, found %s with "%s"' % (type(p),str(p)))
335 if self.paramstyle=='qmark':
338 for p in statementParts:
339 if isinstance(p,DbParam):
341 params.append(p.value)
345 if self.paramstyle=='named':
348 for p in statementParts:
349 if isinstance(p,DbParam):
350 name = 'p%d' % len(params)
351 sqlParts.append(':%s' % name)
352 params[name] = p.value
357 raise ValueError ('Unknown SQL parameter style %s' % self.paramstyle)
359 sql = ' '.join(sqlParts)
361 # Handle database-dependent limit clause (PRELIMINARY)
363 if self.dbtype=='oracle':
364 # WARNING: the following doesn't work always when ordering rows!
365 if 'order by' in sql: # WARNING: works only if order by is in lowercase!
366 sqlParts2 = sql.split('order by')
367 sql = '%s and ROWNUM <= %i order by %s' % (sqlParts2[0],limit,sqlParts2[1])
369 sql += ' and ROWNUM <= %i' % limit
371 sql += ' limit %i' % limit
374 print ('\nExecuting SQL statement: ',sql)
375 print (' with parameters: ',params)
377 cursor = self.dbcon.cursor()
379 cursor.execute(sql,params)
382 except Exception as e:
383 msg = '\nDatabase error executing SQL statement\n %s\nusing parameters\n %s\n%s' % (sql,params,e)
384 raise TaskManagerDatabaseError (msg)
395 def addTask(self,dsName,taskName,template,release,njobs,
396 taskpostprocsteps='',
397 status=StatusCodes['SUBMITTED'],
398 onDisk=OnDiskCodes['ALLONDISK'],
403 """Add an entry for a new task if the task doesn't exist already. If the task exists,
404 its UPDATED, NJOBS, STATUS and ONDISK fields will be updated."""
406 nDefined = self.getNTasks(['where DSNAME =',DbParam(dsName),'and TASKNAME=',DbParam(taskName)])
407 updateStatus = self.getNTasks(['where DSNAME = ',DbParam(dsName),'and TASKNAME=',DbParam(taskName),'and STATUS < %i' % TaskManager.StatusCodes['POSTPROCRUNNING']])
410 # Task entry exists already; only update UPDATED, NJOBS, STATUS, ONDISK and possibly ATLREL
411 task = self.getTaskDict(dsName,taskName)
412 if task['TEMPLATE']!=template:
413 print ('ERROR: Must not update task with different template: DSNAME = %s, TASKNAME = %s, templates = %s vs %s' % (dsName,taskName,task['TEMPLATE'],template))
415 updateStr = ['update TASKS set UPDATED =',DbParam(tstamp),
416 ', NJOBS = ',DbParam(task['NJOBS']+njobs),
417 ', ONDISK = ',DbParam(onDisk)]
419 if release not in task['ATLREL']:
420 print ('WARNING: Updating task using different release: DSNAME = %s, TASKNAME = %s, release = = %s vs %s' % (dsName,taskName,task['ATLREL'],release))
421 release = '; '.join([task['ATLREL'],release])
422 updateStr += [', ATLREL = ',DbParam(release)]
425 updateStr += [', STATUS = ',DbParam(status)]
427 updateStr += ['where DSNAME = ',DbParam(dsName),'and TASKNAME = ',DbParam(taskName)]
429 self.execute( updateStr)
436 createdUser = getUserName()
438 createdHost = os.uname()[1]
439 runnr = getRunFromName(dsName,None,True)
440 self.execute( ['insert into TASKS (DSNAME,RUNNR,TASKNAME,TEMPLATE,TASKPOSTPROCSTEPS,ATLREL,CREATED,CREATED_USER,CREATED_HOST,UPDATED,STATUS,ONDISK,NJOBS,TASKCOMMENT) values (',
443 DbParam(taskName),',',
444 DbParam(template),',',
445 DbParam(taskpostprocsteps),',',
446 DbParam(release),',',
447 DbParam(createdTime),',',
448 DbParam(createdUser),',',
449 DbParam(createdHost),',',
454 DbParam(comment),')'],
464 def setStatus(self,dsName,taskName,status,oldStatus=None):
465 """Set the status of a task. If oldStatus is set, the task must be in the designated status."""
468 cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', STATUS =',DbParam(status),
469 'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName),'and STATUS =',DbParam(oldStatus)], True)
471 cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', STATUS =',DbParam(status),
472 'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
473 return cursor.rowcount
476 def setDiskStatus(self,dsName,taskName,onDisk):
477 """Update the onDisk-status of a task."""
479 cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', ONDISK =',DbParam(onDisk),
480 'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
481 return cursor.rowcount
484 def updateStatus(self,dsName,taskName,
485 status,nResultFiles,nJobs,nJobsSubmitted,nJobsRunning,nJobsPostProc,nJobsFailed,nJobsCompleted):
486 """Update task status information including number of jobs in different states."""
488 cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),
489 ', STATUS =',DbParam(status),
490 ', NRESULTFILES =',DbParam(nResultFiles),
491 ', NJOBS =',DbParam(nJobs),
492 ', NJOBS_SUBMITTED =',DbParam(nJobsSubmitted),
493 ', NJOBS_RUNNING =',DbParam(nJobsRunning),
494 ', NJOBS_POSTPROC =',DbParam(nJobsPostProc),
495 ', NJOBS_FAILED =',DbParam(nJobsFailed),
496 ', NJOBS_COMPLETED =',DbParam(nJobsCompleted),
497 'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
498 return cursor.rowcount
501 def setValue(self,dsName,taskName,fieldName,value):
502 """Update a field of a given task."""
504 cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', %s =' % fieldName,DbParam(value),
505 'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
506 return cursor.rowcount
543 def taskIterDict(self, what='*', qual=('order by UPDATED',), limit=999999999):
544 # NOTE: We could implement this more elegantly using row_factory (sqlite3)
545 # or rowfactory (Oracle), but this would come at the expense of a
546 # direct database dependence. Also, the primitive limit on the
547 # number of items returned by the generator avoids database-
548 # dependent SQL limit statements (until limit in TaskManager.execute
549 # is fully implemented also for Oracle.
550 q = [ 'select %s from TASKS' % what ]
552 cursor = self.execute(q)
555 row = cursor.fetchone()
556 if row is None: break
558 yield dictFactory(cursor,row)
561 def getTaskDict(self, dsname, taskname, what='*', qual=()):
562 # NOTE: We could implement this more elegantly using row_factory (sqlite3)
563 # or rowfactory (Oracle), but this would come at the expense of a
564 # direct database dependence.
566 q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME =', DbParam(taskname) ]
569 if self.dbtype=='oracle':
570 # WARNING: DOES THIS ALWAYS WORK? There anyway should only be a single task here?
571 q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME = ', DbParam(taskname), 'and ROWNUM <= 1 order by UPDATED' ]
573 q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME = ', DbParam(taskname), 'order by UPDATED limit 1' ]
574 cursor = self.execute(q)
575 row = cursor.fetchone()
579 return dictFactory(cursor,row)
589 def getTaskNames(self,dsname,taskname=None,addWildCards=True):
590 """Find all matching tasks and return list of full (DSNAME,TASKNAME) pairs. dsname and taskname
591 may be any partial dataset and task name, respectively."""
594 q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' and TASKNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname,taskname) ]
596 q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname) ]
599 q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%s' and TASKNAME like '%s' order by DSNAME,TASKNAME" % (dsname,taskname) ]
601 q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%s' order by DSNAME,TASKNAME" % (dsname) ]
602 return self.execute(q).fetchall()