ATLAS Offline Software
Loading...
Searching...
No Matches
TaskManager.py
Go to the documentation of this file.
1#!/usr/bin/env python
2
3# Copyright (C) 2002-2021 CERN for the benefit of the ATLAS collaboration
4
5"""
6TaskManager is a tool for keeping track of JobRunner jobs using a
7database. TaskManager uses the notion of a task as the primary unit for
8bookkeeping. Each task includes one or more jobs run using the same
9JobRunner template and typically resulting from a single invocation of
10JobRunner through one of the makeDPD.py or runDPD.py
11scripts. TaskManager keeps track of the status of tasks and associated
12jobs, runs postprocessing scripts, and accumulates results for later
13analysis accross tasks.
14
15Written by Juerg Beringer (LBNL) in 2009.
16"""
17__author__ = 'Juerg Beringer'
18__version__ = 'TaskManager.py atlas/athena'
19
20import time, os, glob
21
22from InDetBeamSpotExample.Utils import getRunFromName
23from InDetBeamSpotExample.Utils import getUserName
24
25# Exception classes
26class TaskManagerCheckError(Exception):
27 """Task manager exception class."""
28
29class TaskManagerDatabaseError(Exception):
30 """Task manager exception because of a database error"""
31
32
33def dictFactory(cursor, row):
34 """Convert a tuple from a database query into a dictonary."""
35 d = {}
36 for idx, col in enumerate(cursor.description):
37 d[col[0]] = row[idx]
38 return d
39
40
41def getKey(d, v):
42 """Get the key for which dictonary d has an entry with value v.
43 Returns 'Undefined' if there's no such key, if several values are found."""
44 l = [k for k in d.keys() if d[k]==v]
45 if len(l)==1:
46 return l[0]
47 else:
48 return 'Undefined'
49
50
51def getStatusClass(status):
52 """Returns 'ok', 'warn' or 'bad' depending on the value of status."""
53 statusMap = {
54 7: 'bad',
55 10: 'ok',
56 11: 'bad'
57 }
58 if status in statusMap:
59 return statusMap[status]
60 else:
61 return 'warn'
62
63
64def appendUnique(s,v):
65 if not s:
66 s = '' # make sure it is not None
67 if v not in s.split():
68 s = ' '.join([s,v])
69 return s
70
71
72def getFullTaskNames(taskman,dsname,taskname,requireSingleTask=False,confirmWithUser=False,addWildCards=True):
73 """Retrieve the full dataset and task names given a pair of (dsname,task) that may
74 contain wildcards or be just a parital name such as the run number. Depending
75 on the requireSingleTask and confirmWithUser settings a TaskManagerCheckError
76 is raised if there are multiple tasks or if the user doesn't confirm."""
77 taskList = taskman.getTaskNames(dsname,taskname,addWildCards)
78 if len(taskList)==0:
79 raise TaskManagerCheckError ('ERROR: No tasks found for dataset = %s, task = %s' % (dsname,taskname))
80 if requireSingleTask and len(taskList)!=1:
81 m = "ERROR: Multiple data set names found for dataset = %s, task = %s\n Please use full dataset or task name from list below, using option -n if necessary:\n\n" % (dsname,taskname)
82 m += " %-50s %s\n" % ('DATASET NAME','TASK NAME')
83 m += " %s\n" % (75*'-')
84 for t in taskList:
85 m += " %-50s %s\n" % (t[0],t[1])
86 m += '\n'
87 raise TaskManagerCheckError (m)
88 if confirmWithUser:
89 print ('Please confirm that you want to execute this command for the following tasks:\n')
90 print (" %-50s %s" % ('DATASET NAME','TASK NAME'))
91 print (" %s" % (75*'-'))
92 for t in taskList:
93 print (" %-50s %s" % (t[0],t[1]))
94 a = input('\nARE YOU SURE [n] ? ')
95 if a!='y':
96 raise TaskManagerCheckError ('ERROR: Aborted by user')
97 print()
98 return taskList
99
100
101def getJobConfig(jobDir,dsName,taskName,jobName='*'):
102 """Read config dict from job files."""
103 config = {}
104 configFile = glob.glob('%s/%s/%s/%s/%s' % (jobDir,dsName,taskName,jobName,'*.config.py.final.py'))
105 if not configFile:
106 configFile = glob.glob('%s/%s/%s/%s/%s' % (jobDir,dsName,taskName,jobName,'*.config.py'))
107 if configFile:
108 exec(open(configFile[0]).read(),config) # Eval config file and put defs into config dict
109 return config['jobConfig']
110
111
112class DbParam(object):
113 """A utility class to hold a single parameter to be used in SQL queries."""
114 def __init__(self,value):
115 self.value = value
116 def __repr__(self):
117 return '%r' % self.value
118
119
120class TaskManager:
121 """TaskManager is tool for keeping track of JobRunner jobs."""
122
123 StatusCodes = { 'UNKNOWN': -1,
124 'CONFIGURED': 0,
125 'SUBMITTED': 1,
126 'RUNNING': 2,
127 'PARTIALFAILED': 3,
128 # Codes above are considered tasks whose status needs to
129 # be monitored and updated
130 'POSTPROCESSING': 5,
131 'POSTPROCRUNNING': 6,
132 'POSTPROCFAILED': 7,
133 'COMPLETED': 10,
134 'FAILED': 11}
135
136 OnDiskCodes = { 'UNKNOWN': -1,
137 'ALLONDISK': 0,
138 'RESULTSONDISK': 1,
139 # Codes above are for jobs whose task directory is on disk
140 'ARCHIVED': 10,
141 'DELETED': 11 }
142
143 @classmethod
144 def parseConnectionInfo(self, connstring=''):
145 if not connstring:
146 connstring = os.environ.get('TASKDB', 'sqlite_file:taskdata.db')
147
148 try:
149 dbtype, dbname = connstring.split(':', 1)
150 except Exception:
151 raise ValueError ('Illegal database connection string {}'.format(connstring))
152
153 if dbtype == 'auth_file':
154 # dbname is a file with the actual connection information
155 authfile = dbname
156 try:
157 with open(authfile) as af:
158 connstring = af.read().strip()
159 dbtype, dbname = connstring.split(':', 1)
160 except Exception:
161 raise ValueError ('Invalid authorization file {} (not readable or invalid format)'.format(authfile))
162
163 return dbtype, dbname
164
165
166 def __init__(self, connstring='', createDatabase=False):
167 """Constructor. connstring is a connection string specifying either a SQLite file
168 ("sqlite_file:..."), an Oracle database ("oracle://..."), or an authorization file
169 ("auth_file:...") with connection information. If connstring is empty, the connection
170 information will be taken from TASKDB (if set), or otherwise defaults to
171 'sqlite_file:taskdata.db'."""
172
173 self.debug = False
174 self.paramstyle = None
175 self.is_managing_context = False
176
177 self.dbtype, self.dbname = self.__class__.parseConnectionInfo(connstring)
178
179 if self.dbtype == 'sqlite_file':
180 import sqlite3
181 self.paramstyle = 'qmark'
182 dbexists = os.access(self.dbname, os.F_OK)
183 if dbexists and createDatabase:
184 raise ValueError ('SQLite file {} exists already - remove before recreating'.format(self.dbname))
185 if not (dbexists or createDatabase):
186 raise ValueError ('TaskManager database not found (SQLite file {})'.format(self.dbname))
187 self.dbcon = sqlite3.connect(self.dbname)
188 if createDatabase:
189 self._createSQLiteSchema()
190 elif self.dbtype == 'oracle':
191 import cx_Oracle
192 self.paramstyle = 'named'
193 try:
194 self.dbcon = cx_Oracle.connect(self.dbname)
195 except Exception:
196 print ('ERROR: First connection attempt to Beam Spot Oracle database failed; will retry in 10s ...')
197 time.sleep(10)
198 self.dbcon = cx_Oracle.connect(self.dbname)
199 if createDatabase:
200 self._createOracleSchema()
201 else:
202 raise ValueError ('Unknown database type: {}'.format(self.dbtype))
203
204 def __enter__(self):
205 ''' Remember that we're inside a 'with' statement so we can warn otherwise: '''
206 self.is_managing_context = True
207 return self
208
209 def __exit__(self, exc_type, exc_value, traceback):
210 ''' Close the database connection at the end of the 'with' statement. '''
211 try:
212 self.dbcon.close()
213 except Exception:
214 print ('ERROR: Unable to close database connection')
215
216 def _createSQLiteSchema(self):
217 """Create the database schema for a SQLite3 database."""
218 self.dbcon.cursor().executescript("""
219 create table tasks (
220 TASKID integer primary key autoincrement,
221 DSNAME text,
222 RUNNR integer,
223 TASKNAME text,
224 TEMPLATE text,
225 TASKPOSTPROCSTEPS text,
226 ATLREL text,
227 CREATED real,
228 CREATED_USER text,
229 CREATED_HOST text,
230 UPDATED real,
231 STATUS integer,
232 ONDISK integer,
233 LOGFILES text,
234 COOLTAGS text,
235 NRESULTFILES integer default 0,
236 NJOBS integer default 0,
237 NJOBS_SUBMITTED integer default -1,
238 NJOBS_RUNNING integer default -1,
239 NJOBS_POSTPROC integer default -1,
240 NJOBS_FAILED integer default -1,
241 NJOBS_COMPLETED integer default -1,
242 RESULTFILES text,
243 RESULTLINKS text,
244 TASKCOMMENT text,
245 STATUS_POSTPROC integer,
246 BASEPATH text,
247 AUXDATA text
248 );
249 """)
250 self.dbcon.commit()
251
252
253 def _createOracleSchema(self):
254 """Create the database schema for an Oracle database."""
255 try:
256 self.dbcon.cursor().execute('drop sequence TASKS_SEQ')
257 self.dbcon.cursor().execute('drop trigger TASKS_ID_TRIGGER')
258 self.dbcon.cursor().execute('drop table TASKS')
259 except Exception as e:
260 print (e)
261 self.dbcon.cursor().execute("""
262 create table tasks (
263 TASKID number(10,0),
264 DSNAME varchar2(256),
265 RUNNR number(10,0),
266 TASKNAME varchar2(256),
267 TEMPLATE varchar2(256),
268 TASKPOSTPROCSTEPS varchar2(256),
269 ATLREL varchar2(256),
270 CREATED number,
271 CREATED_USER varchar2(256),
272 CREATED_HOST varchar2(256),
273 UPDATED number,
274 STATUS number(5,0),
275 ONDISK number(5,0),
276 LOGFILES varchar2(256),
277 COOLTAGS varchar2(256),
278 NRESULTFILES number(10,0) default 0,
279 NJOBS number(10,0) default 0,
280 NJOBS_SUBMITTED number(10,0) default -1,
281 NJOBS_RUNNING number(10,0) default -1,
282 NJOBS_POSTPROC number(10,0) default -1,
283 NJOBS_FAILED number(10,0) default -1,
284 NJOBS_COMPLETED number(10,0) default -1,
285 RESULTFILES varchar2(4000),
286 RESULTLINKS varchar2(4000),
287 TASKCOMMENT varchar2(4000),
288 STATUS_POSTPROC number(5,0) default -1,
289 BASEPATH varchar2(256),
290 AUXDATA varchar2(4000),
291 constraint BEAMSPOT_TASKS_PK primary key (TASKID)
292 )
293 """)
294 self.dbcon.cursor().execute('create index TASKS_DSNAME_INDX on TASKS(DSNAME)')
295 self.dbcon.cursor().execute('create index TASKS_RUNNR_INDX on TASKS(RUNNR)')
296 self.dbcon.cursor().execute('alter table TASKS add constraint TASKS_TNAME_DSNAME_UQ unique (TASKNAME, DSNAME)')
297 self.dbcon.cursor().execute('create sequence TASKS_SEQ start with 1 increment by 1 nomaxvalue')
298 # The following statement must not have whitespace at the beginning of the line, otherwise
299 # the trigger function won't be parsed correctly:
300 self.dbcon.cursor().execute("""
301create or replace trigger TASKS_ID_TRIGGER
302before insert on TASKS
303for each row
304begin
305 select TASKS_SEQ.nextval into :new.TASKID from dual;
306end;
307""")
308 self.dbcon.commit()
309
310
311 def execute(self,statementParts, doCommit=False, limit=None):
312 """Execute a SQL statement passed as a list or tuple of statement parts, where each
313 part is either a partial SQL string, or an object of type DbParam specifying a parameter
314 for the SQL statement. The actual SQL statement is assembled (using the parameter style
315 of the current database) and executed, and the resulting cursor is returned.
316 Loosely follows the method discussed in the Python Cookbook.
317 WARNING: At present, limit doesn't work when ordering rows for Oracle!"""
318 if not self.is_managing_context:
319 print ('**WARN** TaskManager will keep the database connection open until it is deleted.')
320 print ('**INFO** TaskManager should generally only be used inside a with statement:')
321 print ('**INFO** with TaskManager(...) as taskman:')
322 print ('**INFO** # do something ...')
323
324 if not statementParts:
325 return None
326 if isinstance(statementParts,str):
327 raise TypeError ('Must pass list or tuple to TaskManager.execute')
328 for p in statementParts:
329 if not (isinstance(p,DbParam) or isinstance(p,str)):
330 raise ValueError ('Can only pass SQL string fragments and DbParam objects in list'
331 'to TaskManager.execute, found %s with "%s"' % (type(p),str(p)))
332 sqlParts = None
333 params = None
334
335 if self.paramstyle=='qmark':
336 sqlParts = []
337 params = []
338 for p in statementParts:
339 if isinstance(p,DbParam):
340 sqlParts.append('?')
341 params.append(p.value)
342 else:
343 sqlParts.append(p)
344
345 if self.paramstyle=='named':
346 sqlParts = []
347 params = {}
348 for p in statementParts:
349 if isinstance(p,DbParam):
350 name = 'p%d' % len(params)
351 sqlParts.append(':%s' % name)
352 params[name] = p.value
353 else:
354 sqlParts.append(p)
355
356 if sqlParts is None:
357 raise ValueError ('Unknown SQL parameter style %s' % self.paramstyle)
358 return None
359 sql = ' '.join(sqlParts)
360
361 # Handle database-dependent limit clause (PRELIMINARY)
362 if limit:
363 if self.dbtype=='oracle':
364 # WARNING: the following doesn't work always when ordering rows!
365 if 'order by' in sql: # WARNING: works only if order by is in lowercase!
366 sqlParts2 = sql.split('order by')
367 sql = '%s and ROWNUM <= %i order by %s' % (sqlParts2[0],limit,sqlParts2[1])
368 else:
369 sql += ' and ROWNUM <= %i' % limit
370 else:
371 sql += ' limit %i' % limit
372
373 if self.debug:
374 print ('\nExecuting SQL statement: ',sql)
375 print (' with parameters: ',params)
376
377 cursor = self.dbcon.cursor()
378 try:
379 cursor.execute(sql,params)
380 if doCommit:
381 self.dbcon.commit()
382 except Exception as e:
383 msg = '\nDatabase error executing SQL statement\n %s\nusing parameters\n %s\n%s' % (sql,params,e)
384 raise TaskManagerDatabaseError (msg)
385 return cursor
386
387
388 def deleteTask(self,dsName,taskName):
389 """Delete a task entry from the taskmanager database."""
390 q = [ 'delete from TASKS where DSNAME =',DbParam(dsName),'and TASKNAME =',DbParam(taskName) ]
391 cursor = self.execute(q,True)
392 return cursor.rowcount
393
394
395 def addTask(self,dsName,taskName,template,release,njobs,
396 taskpostprocsteps='',
397 status=StatusCodes['SUBMITTED'],
398 onDisk=OnDiskCodes['ALLONDISK'],
399 createdTime=None,
400 createdUser=None,
401 createdHost=None,
402 comment=''):
403 """Add an entry for a new task if the task doesn't exist already. If the task exists,
404 its UPDATED, NJOBS, STATUS and ONDISK fields will be updated."""
405 tstamp = time.time()
406 nDefined = self.getNTasks(['where DSNAME =',DbParam(dsName),'and TASKNAME=',DbParam(taskName)])
407 updateStatus = self.getNTasks(['where DSNAME = ',DbParam(dsName),'and TASKNAME=',DbParam(taskName),'and STATUS < %i' % TaskManager.StatusCodes['POSTPROCRUNNING']])
408
409 if nDefined:
410 # Task entry exists already; only update UPDATED, NJOBS, STATUS, ONDISK and possibly ATLREL
411 task = self.getTaskDict(dsName,taskName)
412 if task['TEMPLATE']!=template:
413 print ('ERROR: Must not update task with different template: DSNAME = %s, TASKNAME = %s, templates = %s vs %s' % (dsName,taskName,task['TEMPLATE'],template))
414 else:
415 updateStr = ['update TASKS set UPDATED =',DbParam(tstamp),
416 ', NJOBS = ',DbParam(task['NJOBS']+njobs),
417 ', ONDISK = ',DbParam(onDisk)]
418
419 if release not in task['ATLREL']:
420 print ('WARNING: Updating task using different release: DSNAME = %s, TASKNAME = %s, release = = %s vs %s' % (dsName,taskName,task['ATLREL'],release))
421 release = '; '.join([task['ATLREL'],release])
422 updateStr += [', ATLREL = ',DbParam(release)]
423
424 if updateStatus:
425 updateStr += [', STATUS = ',DbParam(status)]
426
427 updateStr += ['where DSNAME = ',DbParam(dsName),'and TASKNAME = ',DbParam(taskName)]
428
429 self.execute( updateStr)
430
431 else:
432 # New task entry
433 if not createdTime:
434 createdTime = tstamp
435 if not createdUser:
436 createdUser = getUserName()
437 if not createdHost:
438 createdHost = os.uname()[1]
439 runnr = getRunFromName(dsName,None,True)
440 self.execute( ['insert into TASKS (DSNAME,RUNNR,TASKNAME,TEMPLATE,TASKPOSTPROCSTEPS,ATLREL,CREATED,CREATED_USER,CREATED_HOST,UPDATED,STATUS,ONDISK,NJOBS,TASKCOMMENT) values (',
441 DbParam(dsName),',',
442 DbParam(runnr),',',
443 DbParam(taskName),',',
444 DbParam(template),',',
445 DbParam(taskpostprocsteps),',',
446 DbParam(release),',',
447 DbParam(createdTime),',',
448 DbParam(createdUser),',',
449 DbParam(createdHost),',',
450 DbParam(tstamp),',',
451 DbParam(status),',',
452 DbParam(onDisk),',',
453 DbParam(njobs),',',
454 DbParam(comment),')'],
455 True)
456
457
458 def getStatus(self,dsName,taskName):
459 """Get status of a task."""
460 cursor = self.execute( ['select STATUS from TASKS where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName) ] )
461 return cursor.fetchone()[0]
462
463
464 def setStatus(self,dsName,taskName,status,oldStatus=None):
465 """Set the status of a task. If oldStatus is set, the task must be in the designated status."""
466 tstamp = time.time()
467 if oldStatus:
468 cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', STATUS =',DbParam(status),
469 'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName),'and STATUS =',DbParam(oldStatus)], True)
470 else:
471 cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', STATUS =',DbParam(status),
472 'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
473 return cursor.rowcount
474
475
476 def setDiskStatus(self,dsName,taskName,onDisk):
477 """Update the onDisk-status of a task."""
478 tstamp = time.time()
479 cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', ONDISK =',DbParam(onDisk),
480 'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
481 return cursor.rowcount
482
483
484 def updateStatus(self,dsName,taskName,
485 status,nResultFiles,nJobs,nJobsSubmitted,nJobsRunning,nJobsPostProc,nJobsFailed,nJobsCompleted):
486 """Update task status information including number of jobs in different states."""
487 tstamp = time.time()
488 cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),
489 ', STATUS =',DbParam(status),
490 ', NRESULTFILES =',DbParam(nResultFiles),
491 ', NJOBS =',DbParam(nJobs),
492 ', NJOBS_SUBMITTED =',DbParam(nJobsSubmitted),
493 ', NJOBS_RUNNING =',DbParam(nJobsRunning),
494 ', NJOBS_POSTPROC =',DbParam(nJobsPostProc),
495 ', NJOBS_FAILED =',DbParam(nJobsFailed),
496 ', NJOBS_COMPLETED =',DbParam(nJobsCompleted),
497 'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
498 return cursor.rowcount
499
500
501 def setValue(self,dsName,taskName,fieldName,value):
502 """Update a field of a given task."""
503 tstamp = time.time()
504 cursor = self.execute( ['update TASKS set UPDATED =',DbParam(tstamp),', %s =' % fieldName,DbParam(value),
505 'where DSNAME =', DbParam(dsName), 'and TASKNAME =', DbParam(taskName)], True)
506 return cursor.rowcount
507
508
509 def getValue(self, what, qual=()):
510 """Get a single value from the task database. If the query results in more than one value, only
511 the first value is returned."""
512 q = [ 'select %s from TASKS' % what ]
513 q.extend(qual)
514 return self.execute(q).fetchone()[0]
515
516
517 def getCount(self, what, qual=()):
518 # Except when using count(distinct(...)) which is not supported by earlier sqlite versions,
519 # better use getNTasks
520 q = [ 'select %s from TASKS' % what ]
521 q.extend(qual)
522 return len(self.execute(q).fetchall())
523
524
525 def getNTasks(self, qual=()):
526 q = [ 'select count(*) from TASKS' ]
527 q.extend(qual)
528 return self.execute(q).fetchone()[0]
529
530 def getTaskValue(self, dsName,taskName, what):
531 """Get a single value from the task database. If the query results in more than one value, only
532 the first value is returned."""
533 q = [ 'select %s from TASKS where DSNAME =' % what , DbParam(dsName), 'and TASKNAME =', DbParam(taskName)]
534 return self.execute(q).fetchone()[0]
535
536 def taskIter(self, what='*', qual=('order by UPDATED',)):
537 q = [ 'select %s from TASKS' % what ]
538 q.extend(qual)
539 cursor = self.execute(q)
540 return iter(cursor.fetchone,None)
541
542
543 def taskIterDict(self, what='*', qual=('order by UPDATED',), limit=999999999):
544 # NOTE: We could implement this more elegantly using row_factory (sqlite3)
545 # or rowfactory (Oracle), but this would come at the expense of a
546 # direct database dependence. Also, the primitive limit on the
547 # number of items returned by the generator avoids database-
548 # dependent SQL limit statements (until limit in TaskManager.execute
549 # is fully implemented also for Oracle.
550 q = [ 'select %s from TASKS' % what ]
551 q.extend(qual)
552 cursor = self.execute(q)
553 n = 0
554 while n<limit:
555 row = cursor.fetchone()
556 if row is None: break
557 n += 1
558 yield dictFactory(cursor,row)
559
560
561 def getTaskDict(self, dsname, taskname, what='*', qual=()):
562 # NOTE: We could implement this more elegantly using row_factory (sqlite3)
563 # or rowfactory (Oracle), but this would come at the expense of a
564 # direct database dependence.
565 if qual:
566 q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME =', DbParam(taskname) ]
567 q.extend(qual)
568 else:
569 if self.dbtype=='oracle':
570 # WARNING: DOES THIS ALWAYS WORK? There anyway should only be a single task here?
571 q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME = ', DbParam(taskname), 'and ROWNUM <= 1 order by UPDATED' ]
572 else:
573 q = [ 'select %s from TASKS where DSNAME =' % what, DbParam(dsname), 'and TASKNAME = ', DbParam(taskname), 'order by UPDATED limit 1' ]
574 cursor = self.execute(q)
575 row = cursor.fetchone()
576 if row is None:
577 return {}
578 else:
579 return dictFactory(cursor,row)
580
581
582 def getDSNames(self,dsname):
583 """Get all DSNAMEs for a given (partial) data set name dsname."""
584 #q = [ "select distinct(DSNAME) from TASKS where DSNAME like '%%%s%%' order by UPDATED" % dsname ]
585 q = [ "select distinct(DSNAME) from TASKS where DSNAME like '%%%s%%' order by DSNAME" % dsname ]
586 return self.execute(q).fetchall()
587
588
589 def getTaskNames(self,dsname,taskname=None,addWildCards=True):
590 """Find all matching tasks and return list of full (DSNAME,TASKNAME) pairs. dsname and taskname
591 may be any partial dataset and task name, respectively."""
592 if addWildCards:
593 if taskname:
594 q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' and TASKNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname,taskname) ]
595 else:
596 q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%%%s%%' order by DSNAME,TASKNAME" % (dsname) ]
597 else:
598 if taskname:
599 q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%s' and TASKNAME like '%s' order by DSNAME,TASKNAME" % (dsname,taskname) ]
600 else:
601 q = [ "select DSNAME,TASKNAME from TASKS where DSNAME like '%s' order by DSNAME,TASKNAME" % (dsname) ]
602 return self.execute(q).fetchall()
603
604
605class TaskAnalyzer:
606 """TaskAnalyzer is a class for analyzing the files of a given task."""
607
608 def __init__(self,jobDir,dsName,taskName):
609 self.jobDir = jobDir
610 self.dsName = dsName
611 self.taskName = taskName
612 self.path = jobDir+'/'+dsName+'/'+taskName
613
614 self.status = TaskManager.StatusCodes['UNKNOWN']
615 self.onDisk = TaskManager.OnDiskCodes['UNKNOWN']
616 self.nResultFiles = 0
617 self.nJobs = 0
618 self.nJobsSubmitted = 0
619 self.nJobsRunning = 0
620 self.nJobsPostProc = 0
621 self.nJobsFailed = 0
622 self.nJobsCompleted = 0
623
624
625 def isOnDisk(self):
626 """Check if task directories are still on disk."""
627 onDisk = os.path.exists(self.path)
628 if not onDisk:
629 self.onDisk = TaskManager.OnDiskCodes['DELETED']
630 return onDisk
631
632
633 def analyzeFiles(self):
634 onDisk = self.isOnDisk()
635 if onDisk:
636 self.nJobs = len(glob.glob(self.path+'/*/*.config.py'))
637 self.nResultFiles = len(glob.glob(self.path+'/*')) - self.nJobs
638 self.nJobsSubmitted = len(glob.glob(self.path+'/*/*.SUBMITTED'))
639 self.nJobsRunning = len(glob.glob(self.path+'/*/*.RUNNING'))
640 self.nJobsPostProc = len(glob.glob(self.path+'/*/*.POSTPROCESSING'))
641 self.nJobsCompleted = len(glob.glob(self.path+'/*/*.COMPLETED'))
642 self.nJobsFailed = self.nJobsCompleted - len(glob.glob(self.path+'/*/*.exit.0'))
643
644 self.status = TaskManager.StatusCodes['UNKNOWN']
645 if self.nJobs>0:
646 self.status = TaskManager.StatusCodes['CONFIGURED']
647 if self.nJobs==self.nJobsCompleted:
648 self.status = TaskManager.StatusCodes['COMPLETED']
649
650 if self.nJobsSubmitted>0:
651 self.status = TaskManager.StatusCodes['SUBMITTED']
652 if self.nJobsRunning>0:
653 self.status = TaskManager.StatusCodes['RUNNING']
654 if self.nJobsPostProc>0:
655 self.status = TaskManager.StatusCodes['POSTPROCESSING']
656 if self.nJobsFailed>0:
657 if self.nJobsSubmitted>0 or self.nJobsRunning>0 or self.nJobsPostProc>0:
658 self.status = TaskManager.StatusCodes['PARTIALFAILED']
659 else:
660 self.status = TaskManager.StatusCodes['FAILED']
661
662 return onDisk
663
664
665 def updateStatus(self,taskman):
666 taskman.updateStatus(self.dsName,self.taskName,
667 self.status,self.nResultFiles,
668 self.nJobs,self.nJobsSubmitted,self.nJobsRunning,self.nJobsPostProc,self.nJobsFailed,self.nJobsCompleted)
669
670
671
672class JobAnalyzer:
673 """JobAnalyzer is a class for analyzing the jobs of a given task."""
674
675 def __init__(self,jobDir,dsName,taskName):
676 self.jobDir = jobDir
677 self.dsName = dsName
678 self.taskName = taskName
679 self.path = jobDir+'/'+dsName+'/'+taskName
680 #self.maxWallTime = 5*86400 # max time job should be running in s
681
682
683 def jobList(self):
684 try:
685 l = os.listdir(self.path)
686 except Exception:
687 l = []
688 return l
689
690
691 def jobStatus(self,jobName):
692 status = TaskManager.StatusCodes['UNKNOWN']
693 exitcode = ''
694 p = self.path+'/'+jobName
695 if os.path.exists(p):
696 if glob.glob(p+'/*.config.py'):
697 status = TaskManager.StatusCodes['CONFIGURED']
698 if glob.glob(p+'/*.SUBMITTED'):
699 status = TaskManager.StatusCodes['SUBMITTED']
700 # Protect against jobs dying w/o updating time stamp files - must be
701 # accompanied by similar protection in TaskAnalyzer
702 #r = glob.glob(p+'/*.RUNNING'):
703 #if r:
704 # status = TaskManager.StatusCodes['RUNNING']
705 # if len(r)==1 && (time.time()-os.path.getctime(r[0]))>self.maxWallTime:
706 # status = TaskManager.StatusCodes['FAILED']
707 if glob.glob(p+'/*.RUNNING'):
708 status = TaskManager.StatusCodes['RUNNING']
709 if glob.glob(p+'/*.POSTPROCESSING'):
710 status = TaskManager.StatusCodes['POSTPROCESSING']
711 if glob.glob(p+'/*.COMPLETED'):
712 status = TaskManager.StatusCodes['COMPLETED']
713 # Check if job completed w/o producing an exit status
714 if len(glob.glob(p+'/*.exit.0'))==0:
715 status = TaskManager.StatusCodes['FAILED']
716 try:
717 exitcode = open(glob.glob(p+'/*.exitstatus.dat')[0]).read()
718 except Exception:
719 pass
720 if len(glob.glob(p+'/*.exit.*'))>len(glob.glob(p+'/*.exit.0')):
721 status = TaskManager.StatusCodes['FAILED']
722 return (status,exitcode)
723
724
725#
726# Test code for modules
727#
728if __name__ == '__main__':
729 #t = TaskManager('sqlite_file:test.db', True)
730 #t = TaskManager('sqlite_file:test.db')
731 #t = TaskManager('oracle:USER/PASSWORD@HOST')
732 #t.addTask('foo','bar','VertexTemplate.py','15.4.0',55)
733 #t.addTask('foo','bbb','VertexTemplate.py','15.4.0',55)
734 pass