ATLAS Offline Software
Loading...
Searching...
No Matches
trfUtils.py
Go to the documentation of this file.
1# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2
6
7import os
8import os.path as path
9import re
10import signal
11import sys
12import tarfile
13import time
14import uuid
15import socket
16
17import multiprocessing
18import base64
19
20from datetime import datetime
21from subprocess import Popen, STDOUT, PIPE
22from xml.dom import minidom
23from xml.parsers.expat import ExpatError
24from xml.etree import ElementTree
25
26from PyJobTransforms.trfExitCodes import trfExit
27import PyJobTransforms.trfExceptions as trfExceptions
28
29import logging
30from functools import reduce
31msg = logging.getLogger(__name__)
32
33
34
37def findFile(pathvar, fname):
38 # First see if the file already includes a path.
39 msg.debug('Finding full path for {fileName} in path {path}'.format(
40 fileName = fname,
41 path = pathvar
42 ))
43 if fname.startswith('/'):
44 return(fname)
45
46 # Split the path.
47 pathElements = pathvar.split(':')
48 for pathElement in pathElements:
49 if path.exists(path.join(pathElement, fname)):
50 return(path.join(pathElement, fname))
51
52 return(None)
53
54
55
62def getAncestry(listMyOrphans = False):
63 psCmd = ['ps', 'ax', '-o', 'pid,ppid,pgid,args', '-m']
64
65 try:
66 msg.debug('Executing %s', psCmd)
67 p = Popen(psCmd, stdout=PIPE, stderr=PIPE)
68 stdout = p.communicate()[0]
69 psPID = p.pid
70 except OSError as e:
71 msg.error('Failed to execute "ps" to get process ancestry: %s', repr(e))
72 raise
73
74 childDict = {}
75 myPgid = os.getpgrp()
76 myPid = os.getpid()
77 for line in stdout.decode().split('\n'):
78 try:
79 (pid, ppid, pgid, cmd) = line.split(None, 3)
80 pid = int(pid)
81 ppid = int(ppid)
82 pgid = int(pgid)
83 # Ignore the ps process
84 if pid == psPID:
85 continue
86 if ppid in childDict:
87 childDict[ppid].append(pid)
88 else:
89 childDict[ppid] = [pid]
90 if listMyOrphans and ppid == 1 and pgid == myPgid:
91 msg.info("Adding PID {0} to list of my children as it seems to be orphaned: {1}".format(pid, cmd))
92 if myPid in childDict:
93 childDict[myPid].append(pid)
94 else:
95 childDict[myPid] = [pid]
96
97 except ValueError:
98 # Not a nice line
99 pass
100 return childDict
101
102
109def listChildren(psTree = None, parent = os.getpid(), listOrphans = False): # noqa: B008 (PID is constant)
110 '''Take a psTree dictionary and list all children'''
111 if psTree is None:
112 psTree = getAncestry(listMyOrphans = listOrphans)
113
114 msg.debug("List children of %d (%s)", parent, psTree.get(parent, []))
115 children = []
116 if parent in psTree:
117 children.extend(psTree[parent])
118 for child in psTree[parent]:
119 children.extend(listChildren(psTree, child))
120 children.reverse()
121 return children
122
123
124
132def infanticide(childPIDs = None, sleepTime = 3, message = True, listOrphans = False):
133 if childPIDs is None:
134 childPIDs = listChildren(listOrphans = listOrphans)
135
136 if len(childPIDs) > 0 and message:
137 msg.info('Killing these child processes: {0}...'.format(childPIDs))
138
139 for pid in childPIDs:
140 try:
141 os.kill(pid, signal.SIGTERM)
142 except OSError:
143 pass
144
145 time.sleep(sleepTime)
146
147 for pid in childPIDs:
148 try:
149 os.kill(pid, signal.SIGKILL)
150 except OSError:
151 # OSError happens when the process no longer exists - harmless
152 pass
153
154
155def call(args, bufsize=0, executable=None, stdin=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0, message="", logger=msg, loglevel=None, timeout=None, retry=2, timefactor=1.5, sleeptime=10):
156
157 def logProc(p):
158 line=p.stdout.readline()
159 if line:
160 line="%s%s" % (message, line.rstrip())
161 if logger is None:
162 print(line)
163 else:
164 logger.log(loglevel, line)
165
166 def flushProc(p):
167 line=p.stdout.readline()
168 while line:
169 line="%s%s" % (message, line.strip())
170 if logger is None:
171 print(line)
172 else:
173 logger.log(loglevel, line)
174 line=p.stdout.readline()
175
176 if loglevel is None:
177 loglevel=logging.DEBUG
178
179 if timeout is None or timeout<=0: # no timeout set
180 msg.info('Executing %s...', args)
181 starttime = time.time()
182 p=Popen(args=args, bufsize=bufsize, executable=executable, stdin=stdin, stdout=PIPE, stderr=STDOUT, preexec_fn=preexec_fn, close_fds=close_fds, shell=shell, cwd=cwd, env=env, universal_newlines=universal_newlines, startupinfo=startupinfo, creationflags=creationflags)
183 while p.poll() is None:
184 logProc(p)
185 flushProc(p)
186 if timeout is not None:
187 msg.info('Executed call within %d s.', time.time()-starttime)
188 return p.returncode
189
190 else: #timeout set
191 n=0
192 while n<=retry:
193 msg.info('Try %i out of %i (time limit %ss) to call %s.', n+1, retry+1, timeout, args)
194 starttime = time.time()
195 endtime=starttime+timeout
196 p=Popen(args=args, bufsize=bufsize, executable=executable, stdin=stdin, stdout=PIPE, stderr=STDOUT, preexec_fn=preexec_fn, close_fds=close_fds, shell=shell, cwd=cwd, env=env, universal_newlines=universal_newlines, startupinfo=startupinfo, creationflags=creationflags)
197 while p.poll() is None and time.time()<endtime:
198 logProc(p)
199 if p.poll() is None:
200 msg.warning('Timeout limit of %d s reached. Kill subprocess and its children.', timeout)
201 parent=p.pid
202 pids=[parent]
203 pids.extend(listChildren(parent=parent))
204 infanticide(pids)
205 msg.info('Checking if something is left in buffer.')
206 flushProc(p)
207 if n!=retry:
208 msg.info('Going to sleep for %d s.', sleeptime)
209 time.sleep(sleeptime)
210 n+=1
211 timeout*=timefactor
212 sleeptime*=timefactor
213 else:
214 flushProc(p)
215 msg.info('Executed call within %d s.', time.time()-starttime)
216 return p.returncode
217
218 msg.warning('All %i tries failed!', n)
219 raise Exception
220
221
222
224 setupMsg = str()
225 eVars = ['AtlasBaseDir', 'AtlasProject', 'AtlasVersion', 'AtlasPatch', 'AtlasPatchVersion', 'CMTCONFIG', 'TestArea']
226 if "AtlasProject" in os.environ:
227 CMake_Platform = "{0}_PLATFORM".format(os.environ["AtlasProject"])
228 if CMake_Platform in os.environ:
229 eVars.remove("CMTCONFIG")
230 eVars.append(CMake_Platform)
231 for eVar in eVars:
232 if eVar in os.environ:
233 setupMsg += '\t%s=%s\n' % (eVar, os.environ[eVar])
234 # Look for patches so that the job can be rerun
235 if 'WorkDir_DIR' in os.environ and os.access(os.environ['WorkDir_DIR'], os.R_OK):
236 pass
237 # lstags is obsolete with git releases.
238 # setupMsg += "\n\tPatch packages are:\n"
239 # try:
240 # cmd = ['lstags']
241 # lstagsOut = Popen(cmd, shell = False, stdout = PIPE, stderr = STDOUT, bufsize = 1).communicate()[0]
242 # setupMsg += "\n".join([ "\t\t{0}".format(pkg) for pkg in lstagsOut.decode().split("\n") ])
243 # except (CalledProcessError, OSError) as e:
244 # setupMsg += 'Execution of lstags failed: {0}'.format(e)
245 else:
246 setupMsg+= "No readable patch area found"
247
248 return setupMsg.rstrip()
249
250
251
261def releaseIsOlderThan(major, minor=None):
262 if 'AtlasVersion' not in os.environ or 'AtlasBaseDir' not in os.environ:
263 msg.warning("Could not find 'AtlasVersion' and 'AtlasBaseDir' in the environment - no release match possible")
264 return False
265 try:
266 # First try AtlasVersion, which is clean
267 relRegExp = re.compile(r'(?P<major>\d+)\.(?P<minor>\d+)\.(?P<other>.*)')
268 relMatch = re.match(relRegExp, os.environ['AtlasVersion'])
269 if not relMatch:
270 # Now try the final part of AtlasBaseDir
271 leafDir = path.basename(os.environ['AtlasBaseDir'])
272 relMatch = re.match(relRegExp, leafDir)
273 if not relMatch:
274 msg.info('No identifiable numbered release found from AtlasVersion or AtlasBaseDir - assuming dev/devval/mig')
275 return False
276
277 relmajor = int(relMatch.group('major'))
278 relminor = int(relMatch.group('minor'))
279 msg.info('Detected release major {0}, minor {1} (.{2}) from environment'.format(relmajor, relminor, relMatch.group('other')))
280
281 # Major beats minor, so test this first
282 if relmajor < major:
283 return True
284 if relmajor > major:
285 return False
286
287 # First case is major equality and don't care about minor
288 if minor is None or relminor >= minor:
289 return False
290 return True
291
292 except Exception as e:
293 msg.warning('Exception thrown when attempting to detect athena version ({0}). No release check possible'.format(e))
294 return False
295
296
297
303def asetupReleaseIsOlderThan(asetup_string, major, minor=None):
304 try:
305 relmajor = None
306 relminor = None
307
308 # First split the asetup_string by comma
309 split_string = asetup_string.split(',')
310 # master is always the newest
311 if 'master' in split_string:
312 return False
313
314 # First try major.minor.bugfix
315 reg_exp = re.compile(r'(?P<major>\d+)\.(?P<minor>\d+)\.(?P<other>.*)')
316 for part in split_string:
317 part = part.strip()
318 match = re.match(reg_exp, part)
319 if match:
320 relmajor = int(match.group('major'))
321 relminor = int(match.group('minor'))
322 msg.info('Detected asetup release {0}.{1}(.{2})'.format(relmajor, relminor, match.group('other')))
323 break
324
325 # Then try major.minor
326 if relmajor is None:
327 reg_exp = re.compile(r'(?P<major>\d+)\.(?P<minor>\d+)')
328 for part in split_string:
329 part = part.strip()
330 match = re.match(reg_exp, part)
331 if match:
332 relmajor = int(match.group('major'))
333 relminor = int(match.group('minor'))
334 msg.info('Detected asetup release {0}.{1}'.format(relmajor, relminor))
335 break
336
337 # Bail out
338 if relmajor is None:
339 raise RuntimeError('asetup version could not be parsed')
340
341 # Major beats minor, so test this first
342 if relmajor < major:
343 return True
344 if relmajor > major:
345 return False
346
347 # First case is major equality and don't care about minor
348 if minor is None or relminor >= minor:
349 return False
350 return True
351
352 except Exception as e:
353 msg.warning('Exception thrown when attempting to detect asetup athena version ({0}) from {1}. No release check possible'.format(e, asetup_string))
354 return False
355
356
357
361def shQuoteStrings(strArray = sys.argv):
362 return [ "'" + qstring.replace("'", "\\'") + "'" for qstring in strArray ]
363
364
365
372def lineByLine(filename, strip=True, removeTimestamp=True, substepName=None):
373 linecounter = 0
374 encargs = {'encoding' : 'utf8'}
375 f = open(filename, 'r', **encargs)
376 for line in f:
377 linecounter += 1
378 if substepName and isinstance(substepName, str): # Remove substepName only if caller provides that string.
379 line = line.lstrip(substepName)
380 if removeTimestamp:
381 line = line.lstrip('0123456789:-, ') # Remove timestamps in both serial and MP mode.
382 if strip:
383 line = line.strip()
384 yield line, linecounter
385 f.close()
386
387
388
397def prettyXML(element, indent = ' ', poolFileCatalogFormat = False):
398 # Use minidom for pretty printing
399 # See http://broadcast.oreilly.com/2010/03/pymotw-creating-xml-documents.html
400 xmlstring = ElementTree.tostring(element, 'utf-8')
401 try:
402 metadataDoc = minidom.parseString(xmlstring)
403 except ExpatError:
404 # Getting weird \x00 NULLs on the end of some GUIDs, which minidom.parsestring does not like (is this APR?)
405 msg.warning('Error parsing ElementTree string - will try removing hex literals ({0!r})'.format(xmlstring))
406 xmlstring = xmlstring.replace('\x00', '')
407 metadataDoc = minidom.parseString(xmlstring)
408
409
410 if poolFileCatalogFormat is False:
411 return metadataDoc.toprettyxml(indent=indent, encoding='UTF-8')
412
413 # Now create a new document with the correct doctype for classic POOLFILECATALOG
414 # See http://stackoverflow.com/questions/2337285/set-a-dtd-using-minidom-in-python
415 imp = minidom.DOMImplementation()
416 doctype = imp.createDocumentType(qualifiedName='POOLFILECATALOG', publicId='', systemId='InMemory')
417 doc = imp.createDocument(None, 'POOLFILECATALOG', doctype)
418
419 # Cut and paste the parsed document into the new one
420 # See http://stackoverflow.com/questions/1980380/how-to-render-a-doctype-with-pythons-xml-dom-minidom
421 refel = doc.documentElement
422 for child in metadataDoc.childNodes:
423 if child.nodeType==child.ELEMENT_NODE:
424 doc.replaceChild(doc.importNode(child, True), doc.documentElement)
425 refel= None
426 elif child.nodeType!=child.DOCUMENT_TYPE_NODE:
427 doc.insertBefore(doc.importNode(child, True), refel)
428
429 return doc.toprettyxml(indent=indent, encoding='UTF-8')
430
431
432
435 return datetime.now().replace(microsecond=0).isoformat()
436
437
438
443def forceToAlphaNum(string):
444 if string is None or string.isalnum():
445 return string
446 newstring = ''
447 for piece in string:
448 if piece.isalnum():
449 newstring += piece
450 msg.warning("String {0} was stripped to alphanumeric characters only: {1}".format(string, newstring))
451 return newstring
452
453
454
464def cmpMetadata(metadata1, metadata2, guidCheck = 'valid'):
465 # First check we have the same files
466 allFiles = set(metadata1) | set(metadata2)
467 if len(allFiles) > len(metadata1) or len(allFiles) > len(metadata2):
468 msg.warning('In metadata comparison file lists are not equal - fails ({0} != {1}'.format(metadata1, metadata2))
469 return False
470 for fname in allFiles:
471 allKeys = set(metadata1[fname]) | set(metadata2[fname])
472 if len(allKeys) > len(metadata1[fname]) or len(allFiles) > len(metadata2[fname]):
473 msg.warning('In metadata comparison key lists are not equal - fails')
474 return False
475 for key in allKeys:
476 if key == 'file_guid':
477 if guidCheck == 'ignore':
478 continue
479 elif guidCheck == 'equal':
480 if metadata1[fname]['file_guid'].upper() == metadata2[fname]['file_guid'].upper():
481 continue
482 else:
483 msg.warning('In metadata comparison strict GUID comparison failed.')
484 return False
485 elif guidCheck == 'valid':
486 try:
487 uuid.UUID(metadata1[fname]['file_guid'])
488 uuid.UUID(metadata2[fname]['file_guid'])
489 continue
490 except ValueError:
491 msg.warning('In metadata comparison found invalid GUID strings.')
492 return False
493 if metadata1[fname][key] != metadata2[fname][key]:
494 msg.warning('In metadata comparison found different key values: {0!s} != {1!s}'.format(metadata1[fname][key], metadata2[fname][key]))
495 return True
496
497
498
501def unpackTarFile(filename, directory="."):
502 try:
503 tar = tarfile.open(filename)
504 tar.extractall(path=directory)
505 tar.close()
506 except Exception as e:
507 errMsg = 'Error encountered while unpacking {0} to {1}: {2}'.format(filename, directory, e)
508 msg.error(errMsg)
509 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_SETUP'), errMsg)
510
511
512
520def unpackDBRelease(tarball, dbversion=None):
521 if dbversion is None:
522 dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(tarball))
523 if dbdMatch is None:
524 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_DBRELEASE_PROBLEM'),
525 'Could not find a valid version in the DBRelease tarball: {0}'.format(tarball))
526 dbversion = dbdMatch.group(1)
527 dbsetup = path.abspath(path.join("DBRelease", dbversion, "setup.py"))
528 if os.access(dbsetup, os.R_OK):
529 msg.debug('DBRelease {0} is already unpacked, found {1}'.format(tarball, dbsetup))
530 return False, dbsetup
531 else:
532 msg.debug('Will attempt to unpack DBRelease {0}'.format(tarball))
533 unpackTarFile(tarball)
534 msg.info('DBRelease {0} was unpacked'.format(tarball))
535 if not os.access(dbsetup, os.R_OK):
536 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_DBRELEASE_PROBLEM'),
537 'DBRelease setup file {0} was not readable, even after unpacking {1}'.format(dbsetup, tarball))
538 return True, dbsetup
539
540
543def setupDBRelease(setup):
544 try:
545 dbdir=path.abspath(path.dirname(setup))
546 msg.debug('Will add {0} to sys.path to load DBRelease setup module'.format(dbdir))
547 # N.B. We cannot use __import__ because the X.Y.Z directory name is illegal for a python module path
548 opath = sys.path
549 sys.path.insert(0, dbdir)
550 from setup import Setup
551 # Instansiate the Setup module, which activates the customisation
552 Setup(dbdir)
553 sys.path = opath
554 msg.debug('DBRelease setup module was initialised successfully')
555 except ImportError as e:
556 errMsg = 'Import error while trying to load DB Setup module: {0}'.format(e)
557 msg.error(errMsg)
558 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_DBRELEASE_PROBLEM'), errMsg)
559 except Exception as e:
560 errMsg = 'Unexpected error while trying to load DB Setup module: {0}'.format(e)
561 msg.error(errMsg)
562 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_DBRELEASE_PROBLEM'), errMsg)
563
564
565
569def cvmfsDBReleaseCheck(dbrelease):
570 dbsetup = None
571 dbdMatch = re.match(r'([\d\.]+|current)$', dbrelease)
572 msg.debug('Attempting to setup DBRelease {0} from cvmfs'.format(dbrelease))
573 if dbdMatch:
574 if 'VO_ATLAS_SW_DIR' in os.environ:
575 msg.debug('Found site defined path to ATLAS software: {0}'.format(os.environ['VO_ATLAS_SW_DIR']))
576 dbsetup = path.join(os.environ['VO_ATLAS_SW_DIR'], 'database', 'DBRelease', dbrelease, 'setup.py')
577 if os.access(dbsetup, os.R_OK):
578 return dbsetup
579 msg.warning('Site defined path to ATLAS software seems invalid (failed to access {0}). Will also try standard cvmfs path.'.format(dbsetup))
580 else:
581 msg.debug('Using standard CVMFS path to ATLAS software')
582
583 dbsetup = path.join('/cvmfs/atlas.cern.ch/repo/sw/database/DBRelease', dbrelease, 'setup.py')
584 if not os.access(dbsetup, os.R_OK):
585 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_DBRELEASE_PROBLEM'),
586 'CVMFS DBRelease setup file {0} was not readable'.format(dbsetup))
587 msg.debug('Using cvmfs based dbrelease: {0}'.format(path.dirname(dbsetup)))
588 else:
589 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_DBRELEASE_PROBLEM'),
590 'Unable to interpret DBRelease "{0}" as either a tarball or a CVMFS release directory'.format(dbrelease))
591 return dbsetup
592
593
594
598def pickledDump(argdict):
599 if 'dumpPickle' not in argdict:
600 return
601
602 from PyJobTransforms.trfArgClasses import argument
603 theArgumentDictionary = {}
604 for k, v in argdict.items():
605 if k == 'dumpPickle':
606 continue
607 if isinstance(v, argument):
608 theArgumentDictionary[k] = getattr(v, "dumpvalue", v.value)
609 else:
610 theArgumentDictionary[k] = v
611 with open(argdict['dumpPickle'], 'wb') as pickleFile:
612 import pickle as pickle
613 pickle.dump(theArgumentDictionary, pickleFile)
614
615
616
617def JSONDump(argdict):
618 if 'dumpJSON' not in argdict:
619 return
620
621 from PyJobTransforms.trfArgClasses import argument
622 theArgumentDictionary = {}
623 for k, v in argdict.items():
624 if k == 'dumpJSON':
625 continue
626 if isinstance(v, argument):
627 theArgumentDictionary[k] = getattr(v, "dumpvalue", v.value)
628 else:
629 theArgumentDictionary[k] = v
630 with open(argdict['dumpJSON'], 'w') as JSONFile:
631 import json
632 json.dump(theArgumentDictionary, JSONFile, sort_keys=True, indent=2)
633
634
636def convertToStr(in_string):
637 if isinstance(in_string, dict):
638 return dict([(convertToStr(key), convertToStr(value)) for key, value in in_string.items()])
639 elif isinstance(in_string, list):
640 return [convertToStr(element) for element in in_string]
641 # Unicode is always str in Python3, but bytes are not
642 # TODO: remove unicode comparison after Python 3 migration
643 elif in_string.__class__.__name__ == 'unicode':
644 return in_string.encode('utf-8')
645 elif in_string.__class__.__name__ == 'bytes':
646 return in_string.decode('utf-8')
647 else:
648 return in_string
649
650
651
652def cliToKey(option):
653 return option.lstrip('-').replace('-', '_')
654
655
656
660def printHR(the_object):
661 # dictionary
662 if isinstance(the_object, dict):
663 for key, value in sorted(the_object.items()):
664 print(u'{key}: {value}'.format(key = key, value = value))
665 # list or tuple
666 elif isinstance(the_object, (list, tuple)):
667 for element in the_object:
668 print(element)
669 # other
670 else:
671 print(the_object)
672
673
674
679 return str(base64.urlsafe_b64encode(uuid.uuid4().bytes).strip("="))
680
681
682
692 quantity = None,
693 unitSingular = "unit",
694 unitPlural = "units"
695 ):
696 if quantity == 1:
697 return unitSingular
698 else:
699 return unitPlural
700
701
702# @brief returns if the current job is running in interactive environment.
704 isInteractiveEnv = False
705 # PS1 is for sh, bash; prompt is for tcsh and zsh
706 if 'PS1' in os.environ or 'prompt' in os.environ:
707 isInteractiveEnv = True
708 elif os.isatty(sys.stdout.fileno()) or os.isatty(sys.stdin.fileno()):
709 isInteractiveEnv = True
710
711 return isInteractiveEnv
712
713
714
723class Job(object):
724
725
727 self,
728 workFunction = None,
729 workFunctionKeywordArguments = {},
730 workFunctionTimeout = None,
731 name = None,
732 ):
733 self.workFunction = workFunction
734 self.workFunctionKeywordArguments = workFunctionKeywordArguments
735 self.workFunctionTimeout = workFunctionTimeout
736 self.className = self.__class__.__name__
737 self.resultGetter = None
738 if name is None:
740 else:
741 self._name = name
742 if self.workFunction is None:
743 exceptionMessage = "work function not specified"
744 msg.error("{notifier}: exception message: {exceptionMessage}".format(
745 notifier = self.className,
746 exceptionMessage = exceptionMessage
747 ))
749 trfExit.nameToCode('TRF_INTERNAL'),
750 exceptionMessage
751 )
752
753 @property
754 def name(self):
755 return self._name
756
757
761 def __str__(self):
762 descriptionString = ""
763 for key, value in sorted(vars(self).items()):
764 descriptionString += str("{key}:{value} ".format(
765 key = key,
766 value = value)
767 )
768 return descriptionString
769
770
773 def printout(self):
774 printHR(vars(self))
775
776
777
788
789
791 self,
792 jobs = None,
793 name = None,
794 timeout = None
795 ):
796 self.jobs = jobs
797 self.className = self.__class__.__name__
798 self.completeStatus = False
800 if name is None:
802 else:
803 self._name = name
804 #self.timeStampSubmissionComplete = None #delete
805 if timeout is None:
806 self.timeout = 0
807 for job in self.jobs:
808 self.timeout += job.workFunctionTimeout
809 self.results = []
810
811 @property
812 def name(self):
813 return self._name
814
815
819 def __str__(self):
820 descriptionString = ""
821 for key, value in sorted(vars(self).items()):
822 descriptionString += str("{key}:{value} ".format(
823 key = key,
824 value = value)
825 )
826 return descriptionString
827
828
835 def timeoutStatus(self):
836 # If the JobGroup is complete or not submitted, then it is not timed
837 # out.
838 if self.completeStatus is True or self.timeStampSubmission is None:
839 return False
840 # If the JobGroup is not complete or submitted, then it may be timed
841 # out.
842 elif time.time() > self.timeout + self.timeStampSubmission:
843 return True
844 else:
845 return False
846
847
850 def printout(self):
851 printHR(vars(self))
852
853
854
856 # Multiprocessing uses signals to communicate with subprocesses, so the
857 # following two lines prevent the transforms signal handlers from
858 # interfering:
859 from PyJobTransforms.trfSignal import resetTrfSignalHandlers
860 resetTrfSignalHandlers()
861 signal.signal(signal.SIGINT, signal.SIG_IGN)
862
863
864
868
869
874 self,
875 jobSubmission = None,
876 numberOfProcesses = multiprocessing.cpu_count(), # noqa: B008 (cpu_count is constant)
877 ):
878 self.jobSubmission = jobSubmission
879 self.numberOfProcesses = numberOfProcesses
880 self.className = self.__class__.__name__
881 self.status = "starting"
882 msg.debug("{notifier}: status: {status}".format(
883 notifier = self.className,
884 status = self.status)
885 )
886 self.countOfJobs = None
888 self.pool = multiprocessing.Pool(
890 initialise_processes
891 )
892 msg.debug("{notifier}: pool of {numberOfProcesses} {units} created".format(
893 notifier = self.className,
894 numberOfProcesses = str(self.numberOfProcesses),
895 units = units(quantity = self.numberOfProcesses,
896 unitSingular = "process", unitPlural = "processes")
897 ))
898 self.status = "ready"
899 msg.debug("{notifier}: status: {status}".format(
900 notifier = self.className,
901 status = self.status
902 ))
903
904
908 def __str__(self):
909 descriptionString = ""
910 for key, value in sorted(vars(self).items()):
911 descriptionString += str("{key}:{value} ".format(
912 key = key,
913 value = value
914 ))
915 return descriptionString
916
917
920 def printout(self):
921 printHR(vars(self)
922 )
923
924
929 self,
930 jobSubmission = None
931 ):
932 # If the input submission is not None, then update the jobSubmission
933 # data attribute to that specified for this method.
934 if jobSubmission is not None:
935 self.jobSubmission = jobSubmission
936 self.status = "submitted"
937 msg.debug("{notifier}: status: {status}".format(
938 notifier = self.className,
939 status = self.status
940 ))
941 # If the input submission is a Job object, contain it in a JobGroup
942 # object.
943 if isinstance(self.jobSubmission, Job):
944 jobGroup = JobGroup(
945 jobs = [self.jobSubmission,],
946 )
947 self.jobSubmission = jobGroup
948 # Count the number of jobs.
949 self.countOfJobs = len(self.jobSubmission.jobs)
951 # Build a contemporary list of the names of jobs.
953 for job in self.jobSubmission.jobs:
954 self.listOfNamesOfRemainingJobs.append(job.name)
955 msg.debug("{notifier}: received job group submission '{name}' of {countOfJobs} {units}".format(
956 notifier = self.className,
957 name = self.jobSubmission.name,
958 countOfJobs = self.countOfJobs,
959 units = units(
960 quantity = self.countOfRemainingJobs,
961 unitSingular = "job",
962 unitPlural = "jobs"
963 )
964 ))
965 msg.debug(self.statusReport())
966 msg.debug("{notifier}: submitting job group submission '{name}' to pool".format(
967 notifier = self.className,
968 name = self.jobSubmission.name
969 ))
970 # Cycle through all jobs in the input submission and apply each to the
971 # pool.
972 for job in self.jobSubmission.jobs:
973 job.timeStampSubmission = time.time()
974 msg.debug("{notifier}: job '{name}' submitted to pool".format(
975 notifier = self.className,
976 name = job.name
977 ))
978 # Apply the job to the pool, applying the object pool.ApplyResult
979 # to the job as a data attribute.
980 job.resultGetter = self.pool.apply_async(
981 func = job.workFunction,
982 kwds = job.workFunctionKeywordArguments
983 )
984 # Prepare monitoring of job group times in order to detect a job group
985 # timeout by recording the time of complete submission of the job group.
986 self.jobSubmission.timeStampSubmission = time.time()
987 msg.debug("{notifier}: job group submission complete: {countOfJobs} {units} submitted to pool (timestamp: {timeStampSubmission})".format(
988 notifier = self.className,
989 countOfJobs = self.countOfJobs,
990 units = units(
991 quantity = self.countOfJobs,
992 unitSingular = "job",
993 unitPlural = "jobs"
994 ),
995 timeStampSubmission = self.jobSubmission.timeStampSubmission
996 ))
997 self.status = "processing"
998 msg.debug("{notifier}: status: {status}".format(
999 notifier = self.className,
1000 status = self.status
1001 ))
1002 return 0
1003
1004
1008 def getResults(self):
1009 # While the number of jobs remaining is greater than zero, cycle over
1010 # all jobs in the JobGroup object submission submission, watching for a
1011 # timeout of the JobGroup object submission. If a result has not been
1012 # retrieved for a job (i.e. the Job object does not have a result data
1013 # attribute), then check if a result is available for the job (using the
1014 # method multiprocessing.pool.AsyncResult.ready()). If a result is
1015 # available for the job, then check if the job has run successfully
1016 # (using the method multiprocessing.pool.AsyncResult.successful()). If
1017 # the job has not been successful, raise an exception, otherwise, get
1018 # the result of the job and save it to the result data attribute of the
1019 # job.
1020 msg.debug("{notifier}: checking for job {units}".format(
1021 notifier = self.className,
1022 units = units(
1023 quantity = self.countOfRemainingJobs,
1024 unitSingular = "result",
1025 unitPlural = "results")
1026 )
1027 )
1028 while self.countOfRemainingJobs > 0:
1029 # Check for timeout of the job group. If the current timestamp is
1030 # greater than the job group timeout (derived from the sum of the
1031 # set of all job timeout specifications in the job group) + the job
1032 # group submission timestamp, then raise an excepton, otherwise
1033 # cycle over all jobs.
1034 # Allow time for jobs to complete.
1035 time.sleep(0.25)
1036 if self.jobSubmission.timeoutStatus():
1037 msg.error("{notifier}: job group '{name}' timed out".format(
1038 notifier = self.className,
1039 name = self.jobSubmission.name
1040 ))
1041 self._abort()
1042 exceptionMessage = "timeout of a function in list {listOfNamesOfRemainingJobs}".format(
1043 listOfNamesOfRemainingJobs = self.listOfNamesOfRemainingJobs
1044 )
1045 msg.error("{notifier}: exception message: {exceptionMessage}".format(
1046 notifier = self.className,
1047 exceptionMessage = exceptionMessage
1048 ))
1050 trfExit.nameToCode('TRF_EXEC_TIMEOUT'),
1051 exceptionMessage
1052 )
1053 else:
1054 for job in self.jobSubmission.jobs:
1056 if not hasattr(job, 'result'):
1057 # Maintain a contemporary list of the names of remaining
1058 # jobs.
1059 self.listOfNamesOfRemainingJobs.append(job.name)
1060 # If the result of the job is ready...
1061 if job.resultGetter.ready():
1062 msg.debug(
1063 "{notifier}: result ready for job '{name}'".format(
1064 notifier = self.className,
1065 name = job.name
1066 )
1067 )
1068 job.successStatus = job.resultGetter.successful()
1069 msg.debug(
1070 "{notifier}: job '{name}' success status: {successStatus}".format(
1071 notifier = self.className,
1072 name = job.name,
1073 successStatus = job.successStatus
1074 )
1075 )
1076 # If the job was successful, create the result data
1077 # attribute of the job and save the result to it.
1078 if job.successStatus:
1079 job.result = job.resultGetter.get()
1080 msg.debug(
1081 "{notifier}: result of job '{name}': {result}".format(
1082 notifier = self.className,
1083 name = job.name,
1084 result = job.result
1085 )
1086 )
1087 self.countOfRemainingJobs -= 1
1088 msg.debug(
1089 "{notifier}: {countOfRemainingJobs} {units} remaining".format(
1090 notifier = self.className,
1091 countOfRemainingJobs = self.countOfRemainingJobs,
1092 units = units(
1093 quantity = self.countOfRemainingJobs,
1094 unitSingular = "job",
1095 unitPlural = "jobs"
1096 )
1097 )
1098 )
1099 # If the job was not successful, raise an exception
1100 # and abort processing.
1101 elif not job.successStatus:
1102 msg.error(
1103 "{notifier}: job '{name}' failed".format(
1104 notifier = self.className,
1105 name = job.name
1106 )
1107 )
1108 self._abort()
1109 exceptionMessage = "failure of function '{name}' with arguments {arguments}".format(
1110 name = job.workFunction.__name__,
1111 arguments = job.workFunctionKeywordArguments
1112 )
1113 msg.error("{notifier}: exception message: {exceptionMessage}".format(
1114 notifier = self.className,
1115 exceptionMessage = exceptionMessage
1116 ))
1118 trfExit.nameToCode('TRF_EXEC_FAIL'),
1119 exceptionMessage
1120 )
1121 # All results having been returned, create the 'results' list data
1122 # attribute of the job group and append all individual job results to
1123 # it.
1124 self.jobSubmission.timeStampComplete = time.time()
1125 self.jobSubmission.completeStatus = True
1126 msg.debug("{notifier}: all {countOfJobs} {units} complete (timestamp: {timeStampComplete})".format(
1127 notifier = self.className,
1128 countOfJobs = self.countOfJobs,
1129 units = units(
1130 quantity = self.countOfJobs,
1131 unitSingular = "job",
1132 unitPlural = "jobs"
1133 ),
1134 timeStampComplete = self.jobSubmission.timeStampComplete
1135 ))
1136 self.jobSubmission.processingTime = self.jobSubmission.timeStampComplete - self.jobSubmission.timeStampSubmission
1137 msg.debug("{notifier}: time taken to process all {countOfJobs} {units}: {processingTime}".format(
1138 notifier = self.className,
1139 countOfJobs = self.countOfJobs,
1140 units = units(
1141 quantity = self.countOfJobs,
1142 unitSingular = "job",
1143 unitPlural = "jobs"
1144 ),
1145 processingTime = self.jobSubmission.processingTime
1146 ))
1147 for job in self.jobSubmission.jobs:
1148 self.jobSubmission.results.append(job.result)
1149 self._terminate()
1150 return self.jobSubmission.results
1151 self._terminate()
1152
1153
1157 def statusReport(self):
1158 statusReport = "\n{notifier}:\n status report:".format(
1159 notifier = self.className
1160 )
1161 # information on parallel job processor
1162 statusReport += "\n parallel job processor configuration:"
1163 statusReport += "\n status: {notifier}".format(
1164 notifier = str(self.status)
1165 )
1166 statusReport += "\n number of processes: {notifier}".format(
1167 notifier = str(self.numberOfProcesses)
1168 )
1169 # information on job group submission
1170 statusReport += "\n job group submission: '{notifier}'".format(
1171 notifier = self.jobSubmission.name
1172 )
1173 statusReport += "\n total number of jobs: {notifier}".format(
1174 notifier = str(self.countOfJobs)
1175 )
1176 statusReport += "\n number of incomplete jobs: {notifier}".format(
1177 notifier = str(self.countOfRemainingJobs)
1178 )
1179 statusReport += "\n names of incomplete jobs: {notifier}".format(
1180 notifier = self.listOfNamesOfRemainingJobs
1181 )
1182 # information on jobs (if existent)
1183 if self.jobSubmission.jobs:
1184 statusReport += "\n jobs:"
1185 for job in self.jobSubmission.jobs:
1186 statusReport += "\n job '{name}':".format(
1187 name = job.name
1188 )
1189 statusReport += "\n workFunction: '{name}'".format(
1190 name = job.workFunction.__name__
1191 )
1192 statusReport += "\n workFunctionKeywordArguments: '{arguments}'".format(
1193 arguments = job.workFunctionKeywordArguments
1194 )
1195 statusReport += "\n workFunctionTimeout: '{timeout}'".format(
1196 timeout = job.workFunctionTimeout
1197 )
1198 if hasattr(job, 'result'):
1199 statusReport += "\n result: '{result}'".format(
1200 result = job.result
1201 )
1202 # statistics of parallel job processor run
1203 if hasattr(self.jobSubmission, 'processingTime'):
1204 statusReport += "\n statistics:"
1205 if hasattr(self.jobSubmission, 'processingTime'):
1206 statusReport += "\n total processing time: {processingTime} s".format(
1207 processingTime = self.jobSubmission.processingTime
1208 )
1209 return statusReport
1210
1211
1214 def _abort(self):
1215 self.status = "aborting"
1216 msg.debug("{notifier}: status: {status}".format(
1217 notifier = self.className,
1218 status = self.status
1219 ))
1220 self._terminate()
1221
1222
1227 def _terminate(self):
1228 self.status = "terminating"
1229 msg.debug("{notifier}: status: {status}".format(
1230 notifier = self.className,
1231 status = self.status
1232 ))
1233 msg.debug("{notifier}: terminating pool of {numberOfProcesses} {units}".format(
1234 notifier = self.className,
1235 numberOfProcesses = str(self.numberOfProcesses),
1236 units = units(
1237 quantity = self.numberOfProcesses,
1238 unitSingular = "process",
1239 unitPlural = "processes"
1240 )
1241 ))
1242 self.pool.terminate()
1243 self.pool.join()
1244 self.status = "finished"
1245 msg.debug("{notifier}: status: {status}".format(
1246 notifier = self.className,
1247 status = self.status
1248 ))
1249 msg.debug(self.statusReport())
1250
1251
1252class analytic():
1253
1254 _fit = None
1255
1256 def __init__(self, **kwargs):
1257 self._fit = None
1258
1259
1264 def fit(self, x, y, model='linear'):
1265 try:
1266 self._fit = Fit(x=x, y=y, model=model)
1267 except Exception as e:
1268 msg.warning('fit failed! {0}'.format(e))
1269
1270 return self._fit
1271
1272 # Return the slope of a linear fit, y(x) = slope * x + intersect
1273 def slope(self):
1274 slope = None
1275
1276 if self._fit:
1277 slope = self._fit.slope()
1278 else:
1279 msg.warning('Fit has not been defined')
1280
1281 return slope
1282
1283 # Return a properly formatted job metrics string with analytics data.
1284 # Currently the function returns a fit for 'pss' vs 'time', whose slope measures memory leaks.
1285 # @param filename: memory monitor output file (string).
1286 # @param x_name: optional string, name selector for table column.
1287 # @param y_name: optional string, name selector for table column.
1288 # @param precision: optional precision for fitted slope parameter, default 2.
1289 # @param tails: should tails be used? (boolean).
1290 # @param minPoints: minimun desired points of data to be fitted (after removing tail)
1291 # @return: {"slope": slope, "chi2": chi2}
1292 def getFittedData(self, filename, x_name='Time', y_name='pss', precision=2, tails=False, minPoints=5):
1293 _memFileToTable = memFileToTable()
1294 fitResult = {}
1295 table = _memFileToTable.getTable(filename, header=None, separator="\t")
1296 if table:
1297 # extract data to be fitted
1298 x, y = self.extractFromTable(table, x_name, y_name)
1299 # remove tails if desired
1300 # this is useful e.g. for memory monitor data where the first and last values
1301 # represent allocation and de-allocation, ie not interesting
1302 # here tail is defined to be first and last 20% of data
1303 if not tails:
1304 tail = int(len(x)/5)
1305 msg.info('removing tails from the memory monitor data; 20% from each side')
1306 x = x[tail:]
1307 x = x[:-tail]
1308 y = y[tail:]
1309 y = y[:-tail]
1310
1311 if len(x)==len(y) and len(x) > minPoints:
1312 msg.info('fitting {0} vs {1}'.format(y_name, x_name))
1313 try:
1314 fit = self.fit(x, y)
1315 _slope = self.slope()
1316 except Exception as e:
1317 msg.warning('failed to fit data, x={0}, y={1}: {2}'.format(x, y, e))
1318 else:
1319 if _slope:
1320 slope = round(fit.slope(), precision)
1321 chi2 = round(fit.chi2(), precision)
1322 fitResult = {"slope": slope, "chi2": chi2}
1323 if slope:
1324 HRslope, unit = self.formatBytes(slope)
1325 msg.info('slope of the fitted line: {0} {1} (using {2} data points, chi2={3})'.format(HRslope, unit, len(x), chi2))
1326 else:
1327 msg.warning('wrong length of table data, x={0}, y={1} (must be same and length>={2})'.format(x, y, minPoints))
1328
1329 return fitResult
1330
1331 # Extract wanted columns. e.g. x: Time , y: pss+swap
1332 # @param x_name: column name to be extracted (string).
1333 # @param y_name: column name to be extracted (may contain '+'-sign) (string).
1334 # @return: x (list), y (list).
1335 def extractFromTable(self, table, x_name, y_name):
1336 headerUpperVersion = {'pss':'PSS', 'swap':'Swap', 'rss':'RSS', 'vmem':'VMEM'}
1337 x = table.get(x_name, [])
1338 if '+' not in y_name:
1339 y = table.get(y_name, [])
1340 if len(y)==0:
1341 y = table.get(headerUpperVersion[y_name], [])
1342 else:
1343 try:
1344 y1_name = y_name.split('+')[0]
1345 y2_name = y_name.split('+')[1]
1346 y1_value = table.get(y1_name, [])
1347 y2_value = table.get(y2_name, [])
1348 if len(y1_value)==0 or len(y2_value)==0:
1349 y1_value = table.get(headerUpperVersion[y1_name], [])
1350 y2_value = table.get(headerUpperVersion[y2_name], [])
1351 except Exception as e:
1352 msg.warning('exception caught: {0}'.format(e))
1353 x = []
1354 y = []
1355 else:
1356 # create new list with added values (1,2,3) + (4,5,6) = (5,7,9)
1357 y = [x0 + y0 for x0, y0 in zip(y1_value, y2_value)]
1358
1359 return x, y
1360
1361 # Make the result of slope human readable (HR)
1362 # default unit is KB
1363 def formatBytes(self, size):
1364 # decimal system
1365 power = 1000
1366 n = 1
1367 power_labels = {1: 'K', 2: 'M', 3: 'G', 4: 'T'}
1368 while size > power:
1369 size /= power
1370 n += 1
1371 return round(size, 2), power_labels[n]+'B/s'
1372
1373
1374
1375class Fit():
1376 _model = 'linear' # fitting model
1377 _x = None # x values
1378 _y = None # y values
1379 _xm = None # x mean
1380 _ym = None # y mean
1381 _ss = None # sum of square deviations
1382 _ss2 = None # sum of deviations
1383 _slope = None # slope
1384 _intersect = None # intersect
1385 _chi2 = None # chi2
1386
1387 def __init__(self, **kwargs):
1388 # extract parameters
1389 self._model = kwargs.get('model', 'linear')
1390 self._x = kwargs.get('x', None)
1391 self._y = kwargs.get('y', None)
1392 self._math = math()
1393
1394 if not self._x or not self._y:
1395 msg.warning('input data not defined')
1396
1397 if len(self._x) != len(self._y):
1398 msg.warning('input data (lists) have different lengths')
1399
1400 # base calculations
1401 if self._model == 'linear':
1402 self._ss = self._math.sum_square_dev(self._x)
1403 self._ss2 = self._math.sum_dev(self._x, self._y)
1404 self.set_slope()
1405 self._xm = self._math.mean(self._x)
1406 self._ym = self._math.mean(self._y)
1407 self.set_intersect()
1408 self.set_chi2()
1409
1410 else:
1411 msg.warning("\'{0}\' model is not implemented".format(self._model))
1412
1413 def fit(self):
1414 #Return fitting object.
1415 return self
1416
1417 def value(self, t):
1418 #Return the value y(x=t) of a linear fit y(x) = slope * x + intersect.
1419 return self._slope * t + self._intersect
1420
1421 def set_chi2(self):
1422 #Calculate and set the chi2 value.
1423 y_observed = self._y
1424 y_expected = []
1425 for x in self._x:
1426 y_expected.append(self.value(x))
1427 if y_observed and y_observed != [] and y_expected and y_expected != []:
1428 self._chi2 = self._math.chi2(y_observed, y_expected)
1429 else:
1430 self._chi2 = None
1431
1432 def chi2(self):
1433 #Return the chi2 value.
1434 return self._chi2
1435
1436 def set_slope(self):
1437 #Calculate and set the slope of the linear fit.
1438 if self._ss2 and self._ss and self._ss != 0:
1439 self._slope = self._ss2 / self._ss
1440 else:
1441 self._slope = None
1442
1443 def slope(self):
1444 #Return the slope value.
1445 return self._slope
1446
1447 def set_intersect(self):
1448 #Calculate and set the intersect of the linear fit.
1449 if self._ym and self._slope and self._xm:
1450 self._intersect = self._ym - self._slope * self._xm
1451 else:
1452 self._intersect = None
1453
1454 def intersect(self):
1455 #Return the intersect value.
1456 return self._intersect
1457
1458
1459
1460class math():
1461
1462 #Return the sample arithmetic mean of data.
1463 def mean(self, data):
1464 n = len(data)
1465 if n < 1:
1466 msg.warning('mean requires at least one data point')
1467 return sum(data)/n
1468
1469 # Return sum of square deviations of sequence data.
1470 # Sum (x - x_mean)**2
1471 def sum_square_dev(self, data):
1472 c = self.mean(data)
1473 return sum((x - c) ** 2 for x in data)
1474
1475 # Return sum of deviations of sequence data.
1476 # Sum (x - x_mean)*(y - y_mean)
1477 def sum_dev(self, x, y):
1478 c1 = self.mean(x)
1479 c2 = self.mean(y)
1480 return sum((_x - c1) * (_y - c2) for _x, _y in zip(x, y))
1481
1482 # Return the chi2 sum of the provided observed and expected values.
1483 def chi2(self, observed, expected):
1484 if 0 in expected:
1485 return 0.0
1486 return sum((_o - _e) ** 2 / _e for _o, _e in zip(observed, expected))
1487
1488
1489
1501
1502 def getTable(self, filename, header=None, separator="\t"):
1503 tabledict = {}
1504 keylist = []
1505 try:
1506 f = open(filename, 'r')
1507 except Exception as e:
1508 msg.warning("failed to open file: {0}, {1}".format(filename, e))
1509 else:
1510 firstline = True
1511 for line in f:
1512 fields = line.split(separator)
1513 fields = [''.join(filter(str.isprintable, field)) for field in fields]
1514 if firstline:
1515 firstline = False
1516 tabledict, keylist = self._defineTableDictKeys(header, fields, separator)
1517 if not header:
1518 continue
1519 # from now on, fill the dictionary fields with the input data
1520 i = 0
1521 for field in fields:
1522 # get the corresponding dictionary key from the keylist
1523 key = keylist[i]
1524 # store the field value in the correct list
1525 tabledict[key].append(float(field))
1526 i += 1
1527 f.close()
1528
1529 return tabledict
1530
1531
1536 def _defineTableDictKeys(self, header, fields, separator):
1537 tabledict = {}
1538 keylist = []
1539
1540 if not header:
1541 # get the dictionary keys from the header of the file
1542 for key in fields:
1543 # first line defines the header, whose elements will be used as dictionary keys
1544 if key == '':
1545 continue
1546 if key.endswith('\n'):
1547 key = key[:-1]
1548 tabledict[key] = []
1549 keylist.append(key)
1550 else:
1551 # get the dictionary keys from the provided header
1552 keys = header.split(separator)
1553 for key in keys:
1554 if key == '':
1555 continue
1556 if key.endswith('\n'):
1557 key = key[:-1]
1558 tabledict[key] = []
1559 keylist.append(key)
1560
1561 return tabledict, keylist
1562
1563
1564
1578 defaultOptions = True,
1579 extraOptionsList = None,
1580 AthenaSerialisedConfigurationFile = "athenaConf.pkl",
1581 returnFormat = "string"
1582 ):
1583
1584 # Access Valgrind suppressions files by finding the paths from
1585 # environment variables. Append the files to the Valgrind suppressions
1586 # options.
1587 suppressionFilesAndCorrespondingPathEnvironmentVariables = {
1588 "etc/valgrind-root.supp": "ROOTSYS",
1589 "Gaudi.supp": "DATAPATH",
1590 "oracleDB.supp": "DATAPATH",
1591 "valgrindRTT.supp": "DATAPATH",
1592 "root.supp": "DATAPATH"
1593 }
1594 optionsList = ["valgrind"]
1595 # If default options are not suppressed, use them.
1596 if defaultOptions:
1597 optionsList.append("--num-callers=30")
1598 optionsList.append("--tool=memcheck")
1599 optionsList.append("--leak-check=full")
1600 optionsList.append("--smc-check=all")
1601 # If extra options are specified, append them to the existing options.
1602 if extraOptionsList:
1603 for option in extraOptionsList:
1604 optionsList.append(option)
1605 # Add suppression files and athena commands
1606 for suppressionFile, pathEnvironmentVariable in suppressionFilesAndCorrespondingPathEnvironmentVariables.items():
1607 suppFile = findFile(os.environ[pathEnvironmentVariable], suppressionFile)
1608 if suppFile:
1609 optionsList.append("--suppressions=" + suppFile)
1610 else:
1611 msg.warning("Bad path to suppression file: {sfile}, {path} not defined".format(
1612 sfile = suppressionFile, path = pathEnvironmentVariable)
1613 )
1614 optionsList.append("$(which python)")
1615 optionsList.append("$(which athena.py)")
1616 optionsList.append(AthenaSerialisedConfigurationFile)
1617 # Return the command in the requested format, string (by default) or list.
1618 if returnFormat is None or returnFormat == "string":
1619 return(" ".join(optionsList))
1620 elif returnFormat == "list":
1621 return(optionsList)
1622 else:
1623 print(
1624 "error: invalid Valgrind command format request (requested " +
1625 "format: {format}; valid formats: string, list)".format(
1626 format = returnFormat
1627 ))
1628 raise(Exception)
1629
1630
1644 defaultOptions = True,
1645 extraOptionsList = None,
1646 AthenaCommand = ["athena.py", "athenaConf.pkl"],
1647 returnFormat = "string"
1648 ):
1649
1650 # Access VTune suppressions files by finding the paths from
1651 # environment variables. Append the files to the VTune suppressions
1652 # options.
1653 # setsid prevents vtune killing the entire job / terminal session when it finishes
1654 optionsList = ["setsid", "vtune"]
1655 # If default options are not suppressed, use them.
1656 if defaultOptions:
1657 optionsList.append("-run-pass-thru=--no-altstack")
1658 optionsList.append("-mrte-mode=native")
1659 # If extra options are specified, append them to the existing options.
1660 isCollectSpecified=False
1661 if extraOptionsList:
1662 for option in extraOptionsList:
1663 optionsList.append(option)
1664 if option.startswith("-collect="):
1665 isCollectSpecified=True
1666 if not isCollectSpecified:
1667 optionsList.append("-collect=hotspots")
1668 optionsList.append("--")
1669 optionsList.extend(AthenaCommand)
1670 # Return the command in the requested format, string (by default) or list.
1671 if returnFormat is None or returnFormat == "string":
1672 return(" ".join(optionsList))
1673 elif returnFormat == "list":
1674 return(optionsList)
1675 else:
1676 print(
1677 "error: invalid VTune command format request (requested " +
1678 "format: {format}; valid formats: string, list)".format(
1679 format = returnFormat
1680 ))
1681 raise(Exception)
1682
1683# calculate cpuTime from os.times() times tuple
1684def calcCpuTime(start, stop):
1685 cpuTime = None
1686 if start and stop:
1687 cpuTime = reduce(lambda x1, x2: x1+x2, list(map(lambda x1, x2: x2-x1, start[2:4], stop[2:4])))
1688
1689 return cpuTime
1690
1691# calculate wallTime from os.times() times tuple
1692def calcWallTime(start, stop):
1693 wallTime = None
1694 if start and stop:
1695 wallTime = stop[4] - start[4]
1696
1697 return wallTime
1698
1699def bind_port(host, port):
1700 ret = 0
1701 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1702 try:
1703 s.bind((host, port))
1704 except socket.error as e:
1705 if e.errno == 98:
1706 print("Port %s is already in use" %port)
1707 else:
1708 # something else raised the socket.error exception
1709 print(e)
1710 ret=1
1711 s.close()
1712 return ret
1713
1714
1718
1719 # Currently the pattern which contains the information for passed events is for example like:
1720 # ISF_SimEventFilter INFO accepted 1 out of 10 events for filter ISF_SimEventFilter (SimEventFilter)
1721 # In case the filter name truncated by ... due to long timestamps, the pattern could still match
1722 # e.g. ISF_SimEventFi... or ISF_SimEventFil...
1723 regExp = re.compile(r'ISF_SimEventFi[lter|...]+\s.*INFO.*accepted\s*(?P<events>[0-9]*)\s*out of\s*(?P<total>[0-9]*).*')
1724 try:
1725 myGen = lineByLine(log)
1726 except IOError as e:
1727 msg.warning('Failed to open transform logfile {0}: {1:s}'.format(log, e))
1728
1729 resimevents = None
1730 passed_events = 0
1731 total_events = 0
1732 for line, lineCounter in myGen:
1733 m = regExp.match(line)
1734 if m:
1735 passed_events += int(m.group('events'))
1736 total_events += int(m.group('total'))
1737 resimevents = passed_events
1738
1739 if resimevents is not None:
1740 msg.info("Summary of events passed the ISF_SimEventFilter: {0} events of total {1}".format(passed_events, total_events) )
1741 else:
1742 msg.warning("Returning null value for the resimevents. No line matched with the regExp for extracting events passed the ISF_SimEventFilter")
1743
1744 return resimevents
static void reduce(HepMC::GenEvent *ge, HepMC::GenParticle *gp)
Remove an unwanted particle from the event, collapsing the graph structure consistently.
Definition FixHepMC.cxx:39
int upper(int c)
void print(char *figname, TCanvas *c1)
STL class.
Base class for execution exceptions.
Exception used by time limited executions.
Low-level fitting class.
Definition trfUtils.py:1375
__init__(self, **kwargs)
Definition trfUtils.py:1387
JobGroup: a set of Job objects and pieces of information relevant to a given set of Job objects.
Definition trfUtils.py:787
__init__(self, jobs=None, name=None, timeout=None)
initialisation method
Definition trfUtils.py:795
printout(self)
print in a human-readable way the items of the object self
Definition trfUtils.py:850
__str__(self)
return an object self description string @ detail This method returns an object description string co...
Definition trfUtils.py:819
timeoutStatus(self)
return Boolean JobGroup timeout status
Definition trfUtils.py:835
Job: a set of pieces of information relevant to a given work function.
Definition trfUtils.py:723
__init__(self, workFunction=None, workFunctionKeywordArguments={}, workFunctionTimeout=None, name=None)
initialisation method
Definition trfUtils.py:732
printout(self)
print in a human-readable way the items of the object self
Definition trfUtils.py:773
__str__(self)
return an object self description string
Definition trfUtils.py:761
ParallelJobProcessor: a multiple-process processor of Job objects.
Definition trfUtils.py:867
__init__(self, jobSubmission=None, numberOfProcesses=multiprocessing.cpu_count())
initialisation method that accepts submissions and starts pool
Definition trfUtils.py:877
statusReport(self)
return a status report string
Definition trfUtils.py:1157
printout(self)
print in a human-readable way the items of the object self
Definition trfUtils.py:920
__str__(self)
return an object self-description string
Definition trfUtils.py:908
_terminate(self)
terminate parallel job processor
Definition trfUtils.py:1227
_abort(self)
abort parallel job processor
Definition trfUtils.py:1214
getResults(self)
get results of JobGroup object submission
Definition trfUtils.py:1008
Analytics service class.
Definition trfUtils.py:1252
extractFromTable(self, table, x_name, y_name)
Definition trfUtils.py:1335
__init__(self, **kwargs)
Definition trfUtils.py:1256
getFittedData(self, filename, x_name='Time', y_name='pss', precision=2, tails=False, minPoints=5)
Definition trfUtils.py:1292
fit(self, x, y, model='linear')
Fitting function For a linear model: y(x) = slope * x + intersect.
Definition trfUtils.py:1264
some mathematical tools
Definition trfUtils.py:1460
sum_square_dev(self, data)
Definition trfUtils.py:1471
chi2(self, observed, expected)
Definition trfUtils.py:1483
sum_dev(self, x, y)
Definition trfUtils.py:1477
Extract a table of data from a txt file.
Definition trfUtils.py:1500
getTable(self, filename, header=None, separator="\t")
Definition trfUtils.py:1502
_defineTableDictKeys(self, header, fields, separator)
Define the keys for the tabledict dictionary.
Definition trfUtils.py:1536
STL class.
void mean(std::vector< double > &bins, std::vector< double > &values, const std::vector< std::string > &files, const std::string &histname, const std::string &tplotname, const std::string &label="")
std::string replace(std::string s, const std::string &s2, const std::string &s3)
Definition hcg.cxx:310
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
Transform argument class definitions.
Module for transform exit codes.
Signal handling utilities for ATLAS job transforms.
releaseIsOlderThan(major, minor=None)
Test (to the best of our knowledge) if the current release is older than a major, minor version numbe...
Definition trfUtils.py:261
isodate()
Return isoformated 'now' string.
Definition trfUtils.py:434
ValgrindCommand(defaultOptions=True, extraOptionsList=None, AthenaSerialisedConfigurationFile="athenaConf.pkl", returnFormat="string")
Definition trfUtils.py:1582
shQuoteStrings(strArray=sys.argv)
Quote a string array so that it can be echoed back on the command line in a cut 'n' paste safe way.
Definition trfUtils.py:361
listChildren(psTree=None, parent=os.getpid(), listOrphans=False)
Find all the children of a particular PID (calls itself recursively to descend into each leaf)
Definition trfUtils.py:109
unpackDBRelease(tarball, dbversion=None)
Ensure that the DBRelease tarball has been unpacked.
Definition trfUtils.py:520
pickledDump(argdict)
Dump a list of arguments to the pickle file given in the 'dumpPickle' argument.
Definition trfUtils.py:598
reportEventsPassedSimFilter(log)
summarize events passed the ISF_SimEventFilter @detail this function sums up all events passed the IS...
Definition trfUtils.py:1717
cliToKey(option)
Convert a command line option to the dictionary key that will be used by argparse.
Definition trfUtils.py:652
asetupReport()
Return a string with a report of the current athena setup.
Definition trfUtils.py:223
cmpMetadata(metadata1, metadata2, guidCheck='valid')
Compare metadata for files, but taking into account that GUID can vary.
Definition trfUtils.py:464
asetupReleaseIsOlderThan(asetup_string, major, minor=None)
Test (to the best of our knowledge) if the asetup release is older than a major, minor version number...
Definition trfUtils.py:303
lineByLine(filename, strip=True, removeTimestamp=True, substepName=None)
Generator to return lines and line count from a file.
Definition trfUtils.py:372
JSONDump(argdict)
Dump a list of arguments to the JSON file given in the 'dumpJSON' argument.
Definition trfUtils.py:617
getAncestry(listMyOrphans=False)
List all processes and parents and form a dictionary where the parent key lists all child PIDs.
Definition trfUtils.py:62
uniqueIdentifier()
return a URL-safe, base 64-encoded pseudorandom UUID
Definition trfUtils.py:678
cvmfsDBReleaseCheck(dbrelease)
Validate a DBRelease exists on cvmfs and return the path to the setup script.
Definition trfUtils.py:569
VTuneCommand(defaultOptions=True, extraOptionsList=None, AthenaCommand=["athena.py", "athenaConf.pkl"], returnFormat="string")
return VTune command @detail This function returns a VTune command for use with Athena.
Definition trfUtils.py:1648
calcWallTime(start, stop)
Definition trfUtils.py:1692
forceToAlphaNum(string)
Strip a string down to alpha-numeric characters only.
Definition trfUtils.py:443
printHR(the_object)
print in a human-readable way the items of a given object
Definition trfUtils.py:660
call(args, bufsize=0, executable=None, stdin=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0, message="", logger=msg, loglevel=None, timeout=None, retry=2, timefactor=1.5, sleeptime=10)
Definition trfUtils.py:155
initialise_processes()
initisation procedure for processes of process pool
Definition trfUtils.py:855
findFile(pathvar, fname)
Find a named file along a colon separated PATH type variable.
Definition trfUtils.py:37
setupDBRelease(setup)
Run a DBRelease setup.
Definition trfUtils.py:543
convertToStr(in_string)
Recursively convert unicode to str, useful when we have just loaded something from json (TODO: make t...
Definition trfUtils.py:636
calcCpuTime(start, stop)
Definition trfUtils.py:1684
unpackTarFile(filename, directory=".")
Unpack a given tarfile.
Definition trfUtils.py:501
prettyXML(element, indent=' ', poolFileCatalogFormat=False)
XML pretty print an ElementTree.ELement object.
Definition trfUtils.py:397
units(quantity=None, unitSingular="unit", unitPlural="units")
return either singular or plural units as appropriate for a given quantity
Definition trfUtils.py:695
infanticide(childPIDs=None, sleepTime=3, message=True, listOrphans=False)
Kill all PIDs.
Definition trfUtils.py:132
bind_port(host, port)
Definition trfUtils.py:1699