ATLAS Offline Software
Loading...
Searching...
No Matches
trfUtils.py
Go to the documentation of this file.
1# Copyright (C) 2002-2025 CERN for the benefit of the ATLAS collaboration
2
6
7import os
8import os.path as path
9import re
10import signal
11import sys
12import tarfile
13import time
14import uuid
15import socket
16
17import multiprocessing
18import multiprocessing.pool
19
20import base64
21
22from datetime import datetime
23from subprocess import Popen, STDOUT, PIPE
24from xml.dom import minidom
25from xml.parsers.expat import ExpatError
26from xml.etree import ElementTree
27
28from PyJobTransforms.trfExitCodes import trfExit
29import PyJobTransforms.trfExceptions as trfExceptions
30
31import logging
32from functools import reduce
33msg = logging.getLogger(__name__)
34
35
36
39def findFile(pathvar, fname):
40 # First see if the file already includes a path.
41 msg.debug('Finding full path for {fileName} in path {path}'.format(
42 fileName = fname,
43 path = pathvar
44 ))
45 if fname.startswith('/'):
46 return(fname)
47
48 # Split the path.
49 pathElements = pathvar.split(':')
50 for pathElement in pathElements:
51 if path.exists(path.join(pathElement, fname)):
52 return(path.join(pathElement, fname))
53
54 return(None)
55
56
57
64def getAncestry(listMyOrphans = False):
65 psCmd = ['ps', 'ax', '-o', 'pid,ppid,pgid,args', '-m']
66
67 try:
68 msg.debug('Executing %s', psCmd)
69 p = Popen(psCmd, stdout=PIPE, stderr=PIPE)
70 stdout = p.communicate()[0]
71 psPID = p.pid
72 except OSError as e:
73 msg.error('Failed to execute "ps" to get process ancestry: %s', repr(e))
74 raise
75
76 childDict = {}
77 myPgid = os.getpgrp()
78 myPid = os.getpid()
79 for line in stdout.decode().split('\n'):
80 try:
81 (pid, ppid, pgid, cmd) = line.split(None, 3)
82 pid = int(pid)
83 ppid = int(ppid)
84 pgid = int(pgid)
85 # Ignore the ps process
86 if pid == psPID:
87 continue
88 if ppid in childDict:
89 childDict[ppid].append(pid)
90 else:
91 childDict[ppid] = [pid]
92 if listMyOrphans and ppid == 1 and pgid == myPgid:
93 msg.info("Adding PID {0} to list of my children as it seems to be orphaned: {1}".format(pid, cmd))
94 if myPid in childDict:
95 childDict[myPid].append(pid)
96 else:
97 childDict[myPid] = [pid]
98
99 except ValueError:
100 # Not a nice line
101 pass
102 return childDict
103
104
111def listChildren(psTree = None, parent = os.getpid(), listOrphans = False): # noqa: B008 (PID is constant)
112 '''Take a psTree dictionary and list all children'''
113 if psTree is None:
114 psTree = getAncestry(listMyOrphans = listOrphans)
115
116 msg.debug("List children of %d (%s)", parent, psTree.get(parent, []))
117 children = []
118 if parent in psTree:
119 children.extend(psTree[parent])
120 for child in psTree[parent]:
121 children.extend(listChildren(psTree, child))
122 children.reverse()
123 return children
124
125
126
134def infanticide(childPIDs = None, sleepTime = 3, message = True, listOrphans = False):
135 if childPIDs is None:
136 childPIDs = listChildren(listOrphans = listOrphans)
137
138 if len(childPIDs) > 0 and message:
139 msg.info('Killing these child processes: {0}...'.format(childPIDs))
140
141 for pid in childPIDs:
142 try:
143 os.kill(pid, signal.SIGTERM)
144 except OSError:
145 pass
146
147 time.sleep(sleepTime)
148
149 for pid in childPIDs:
150 try:
151 os.kill(pid, signal.SIGKILL)
152 except OSError:
153 # OSError happens when the process no longer exists - harmless
154 pass
155
156
157def call(args, bufsize=0, executable=None, stdin=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0, message="", logger=msg, loglevel=None, timeout=None, retry=2, timefactor=1.5, sleeptime=10):
158
159 def logProc(p):
160 line=p.stdout.readline()
161 if line:
162 line="%s%s" % (message, line.rstrip())
163 if logger is None:
164 print(line)
165 else:
166 logger.log(loglevel, line)
167
168 def flushProc(p):
169 line=p.stdout.readline()
170 while line:
171 line="%s%s" % (message, line.strip())
172 if logger is None:
173 print(line)
174 else:
175 logger.log(loglevel, line)
176 line=p.stdout.readline()
177
178 if loglevel is None:
179 loglevel=logging.DEBUG
180
181 if timeout is None or timeout<=0: # no timeout set
182 msg.info('Executing %s...', args)
183 starttime = time.time()
184 p=Popen(args=args, bufsize=bufsize, executable=executable, stdin=stdin, stdout=PIPE, stderr=STDOUT, preexec_fn=preexec_fn, close_fds=close_fds, shell=shell, cwd=cwd, env=env, universal_newlines=universal_newlines, startupinfo=startupinfo, creationflags=creationflags)
185 while p.poll() is None:
186 logProc(p)
187 flushProc(p)
188 if timeout is not None:
189 msg.info('Executed call within %d s.', time.time()-starttime)
190 return p.returncode
191
192 else: #timeout set
193 n=0
194 while n<=retry:
195 msg.info('Try %i out of %i (time limit %ss) to call %s.', n+1, retry+1, timeout, args)
196 starttime = time.time()
197 endtime=starttime+timeout
198 p=Popen(args=args, bufsize=bufsize, executable=executable, stdin=stdin, stdout=PIPE, stderr=STDOUT, preexec_fn=preexec_fn, close_fds=close_fds, shell=shell, cwd=cwd, env=env, universal_newlines=universal_newlines, startupinfo=startupinfo, creationflags=creationflags)
199 while p.poll() is None and time.time()<endtime:
200 logProc(p)
201 if p.poll() is None:
202 msg.warning('Timeout limit of %d s reached. Kill subprocess and its children.', timeout)
203 parent=p.pid
204 pids=[parent]
205 pids.extend(listChildren(parent=parent))
206 infanticide(pids)
207 msg.info('Checking if something is left in buffer.')
208 flushProc(p)
209 if n!=retry:
210 msg.info('Going to sleep for %d s.', sleeptime)
211 time.sleep(sleeptime)
212 n+=1
213 timeout*=timefactor
214 sleeptime*=timefactor
215 else:
216 flushProc(p)
217 msg.info('Executed call within %d s.', time.time()-starttime)
218 return p.returncode
219
220 msg.warning('All %i tries failed!', n)
221 raise Exception
222
223
224
226 setupMsg = str()
227 eVars = ['AtlasBaseDir', 'AtlasProject', 'AtlasVersion', 'AtlasPatch', 'AtlasPatchVersion', 'CMTCONFIG', 'TestArea']
228 if "AtlasProject" in os.environ:
229 CMake_Platform = "{0}_PLATFORM".format(os.environ["AtlasProject"])
230 if CMake_Platform in os.environ:
231 eVars.remove("CMTCONFIG")
232 eVars.append(CMake_Platform)
233 for eVar in eVars:
234 if eVar in os.environ:
235 setupMsg += '\t%s=%s\n' % (eVar, os.environ[eVar])
236 # Look for patches so that the job can be rerun
237 if 'WorkDir_DIR' in os.environ and os.access(os.environ['WorkDir_DIR'], os.R_OK):
238 pass
239 # lstags is obsolete with git releases.
240 # setupMsg += "\n\tPatch packages are:\n"
241 # try:
242 # cmd = ['lstags']
243 # lstagsOut = Popen(cmd, shell = False, stdout = PIPE, stderr = STDOUT, bufsize = 1).communicate()[0]
244 # setupMsg += "\n".join([ "\t\t{0}".format(pkg) for pkg in lstagsOut.decode().split("\n") ])
245 # except (CalledProcessError, OSError) as e:
246 # setupMsg += 'Execution of lstags failed: {0}'.format(e)
247 else:
248 setupMsg+= "No readable patch area found"
249
250 return setupMsg.rstrip()
251
252
253
263def releaseIsOlderThan(major, minor=None):
264 if 'AtlasVersion' not in os.environ or 'AtlasBaseDir' not in os.environ:
265 msg.warning("Could not find 'AtlasVersion' and 'AtlasBaseDir' in the environment - no release match possible")
266 return False
267 try:
268 # First try AtlasVersion, which is clean
269 relRegExp = re.compile(r'(?P<major>\d+)\.(?P<minor>\d+)\.(?P<other>.*)')
270 relMatch = re.match(relRegExp, os.environ['AtlasVersion'])
271 if not relMatch:
272 # Now try the final part of AtlasBaseDir
273 leafDir = path.basename(os.environ['AtlasBaseDir'])
274 relMatch = re.match(relRegExp, leafDir)
275 if not relMatch:
276 msg.info('No identifiable numbered release found from AtlasVersion or AtlasBaseDir - assuming dev/devval/mig')
277 return False
278
279 relmajor = int(relMatch.group('major'))
280 relminor = int(relMatch.group('minor'))
281 msg.info('Detected release major {0}, minor {1} (.{2}) from environment'.format(relmajor, relminor, relMatch.group('other')))
282
283 # Major beats minor, so test this first
284 if relmajor < major:
285 return True
286 if relmajor > major:
287 return False
288
289 # First case is major equality and don't care about minor
290 if minor is None or relminor >= minor:
291 return False
292 return True
293
294 except Exception as e:
295 msg.warning('Exception thrown when attempting to detect athena version ({0}). No release check possible'.format(e))
296 return False
297
298
299
305def asetupReleaseIsOlderThan(asetup_string, major, minor=None):
306 try:
307 relmajor = None
308 relminor = None
309
310 # First split the asetup_string by comma
311 split_string = asetup_string.split(',')
312 # master is always the newest
313 if 'master' in split_string:
314 return False
315
316 # First try major.minor.bugfix
317 reg_exp = re.compile(r'(?P<major>\d+)\.(?P<minor>\d+)\.(?P<other>.*)')
318 for part in split_string:
319 part = part.strip()
320 match = re.match(reg_exp, part)
321 if match:
322 relmajor = int(match.group('major'))
323 relminor = int(match.group('minor'))
324 msg.info('Detected asetup release {0}.{1}(.{2})'.format(relmajor, relminor, match.group('other')))
325 break
326
327 # Then try major.minor
328 if relmajor is None:
329 reg_exp = re.compile(r'(?P<major>\d+)\.(?P<minor>\d+)')
330 for part in split_string:
331 part = part.strip()
332 match = re.match(reg_exp, part)
333 if match:
334 relmajor = int(match.group('major'))
335 relminor = int(match.group('minor'))
336 msg.info('Detected asetup release {0}.{1}'.format(relmajor, relminor))
337 break
338
339 # Bail out
340 if relmajor is None:
341 raise RuntimeError('asetup version could not be parsed')
342
343 # Major beats minor, so test this first
344 if relmajor < major:
345 return True
346 if relmajor > major:
347 return False
348
349 # First case is major equality and don't care about minor
350 if minor is None or relminor >= minor:
351 return False
352 return True
353
354 except Exception as e:
355 msg.warning('Exception thrown when attempting to detect asetup athena version ({0}) from {1}. No release check possible'.format(e, asetup_string))
356 return False
357
358
359
363def shQuoteStrings(strArray = sys.argv):
364 return [ "'" + qstring.replace("'", "\\'") + "'" for qstring in strArray ]
365
366
367
374def lineByLine(filename, strip=True, removeTimestamp=True, substepName=None):
375 linecounter = 0
376 encargs = {'encoding' : 'utf8'}
377 f = open(filename, 'r', **encargs)
378 for line in f:
379 linecounter += 1
380 if substepName and isinstance(substepName, str): # Remove substepName only if caller provides that string.
381 line = line.lstrip(substepName)
382 if removeTimestamp:
383 line = line.lstrip('0123456789:-, ') # Remove timestamps in both serial and MP mode.
384 if strip:
385 line = line.strip()
386 yield line, linecounter
387 f.close()
388
389
390
399def prettyXML(element, indent = ' ', poolFileCatalogFormat = False):
400 # Use minidom for pretty printing
401 # See http://broadcast.oreilly.com/2010/03/pymotw-creating-xml-documents.html
402 xmlstring = ElementTree.tostring(element, 'utf-8')
403 try:
404 metadataDoc = minidom.parseString(xmlstring)
405 except ExpatError:
406 # Getting weird \x00 NULLs on the end of some GUIDs, which minidom.parsestring does not like (is this APR?)
407 msg.warning('Error parsing ElementTree string - will try removing hex literals ({0!r})'.format(xmlstring))
408 xmlstring = xmlstring.replace('\x00', '')
409 metadataDoc = minidom.parseString(xmlstring)
410
411
412 if poolFileCatalogFormat is False:
413 return metadataDoc.toprettyxml(indent=indent, encoding='UTF-8')
414
415 # Now create a new document with the correct doctype for classic POOLFILECATALOG
416 # See http://stackoverflow.com/questions/2337285/set-a-dtd-using-minidom-in-python
417 imp = minidom.DOMImplementation()
418 doctype = imp.createDocumentType(qualifiedName='POOLFILECATALOG', publicId='', systemId='InMemory')
419 doc = imp.createDocument(None, 'POOLFILECATALOG', doctype)
420
421 # Cut and paste the parsed document into the new one
422 # See http://stackoverflow.com/questions/1980380/how-to-render-a-doctype-with-pythons-xml-dom-minidom
423 refel = doc.documentElement
424 for child in metadataDoc.childNodes:
425 if child.nodeType==child.ELEMENT_NODE:
426 doc.replaceChild(doc.importNode(child, True), doc.documentElement)
427 refel= None
428 elif child.nodeType!=child.DOCUMENT_TYPE_NODE:
429 doc.insertBefore(doc.importNode(child, True), refel)
430
431 return doc.toprettyxml(indent=indent, encoding='UTF-8')
432
433
434
437 return datetime.now().replace(microsecond=0).isoformat()
438
439
440
445def forceToAlphaNum(string):
446 if string is None or string.isalnum():
447 return string
448 newstring = ''
449 for piece in string:
450 if piece.isalnum():
451 newstring += piece
452 msg.warning("String {0} was stripped to alphanumeric characters only: {1}".format(string, newstring))
453 return newstring
454
455
456
466def cmpMetadata(metadata1, metadata2, guidCheck = 'valid'):
467 # First check we have the same files
468 allFiles = set(metadata1) | set(metadata2)
469 if len(allFiles) > len(metadata1) or len(allFiles) > len(metadata2):
470 msg.warning('In metadata comparison file lists are not equal - fails ({0} != {1}'.format(metadata1, metadata2))
471 return False
472 for fname in allFiles:
473 allKeys = set(metadata1[fname]) | set(metadata2[fname])
474 if len(allKeys) > len(metadata1[fname]) or len(allFiles) > len(metadata2[fname]):
475 msg.warning('In metadata comparison key lists are not equal - fails')
476 return False
477 for key in allKeys:
478 if key == 'file_guid':
479 if guidCheck == 'ignore':
480 continue
481 elif guidCheck == 'equal':
482 if metadata1[fname]['file_guid'].upper() == metadata2[fname]['file_guid'].upper():
483 continue
484 else:
485 msg.warning('In metadata comparison strict GUID comparison failed.')
486 return False
487 elif guidCheck == 'valid':
488 try:
489 uuid.UUID(metadata1[fname]['file_guid'])
490 uuid.UUID(metadata2[fname]['file_guid'])
491 continue
492 except ValueError:
493 msg.warning('In metadata comparison found invalid GUID strings.')
494 return False
495 if metadata1[fname][key] != metadata2[fname][key]:
496 msg.warning('In metadata comparison found different key values: {0!s} != {1!s}'.format(metadata1[fname][key], metadata2[fname][key]))
497 return True
498
499
500
503def unpackTarFile(filename, directory="."):
504 try:
505 tar = tarfile.open(filename)
506 tar.extractall(path=directory)
507 tar.close()
508 except Exception as e:
509 errMsg = 'Error encountered while unpacking {0} to {1}: {2}'.format(filename, directory, e)
510 msg.error(errMsg)
511 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_SETUP'), errMsg)
512
513
514
522def unpackDBRelease(tarball, dbversion=None):
523 if dbversion is None:
524 dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(tarball))
525 if dbdMatch is None:
526 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_DBRELEASE_PROBLEM'),
527 'Could not find a valid version in the DBRelease tarball: {0}'.format(tarball))
528 dbversion = dbdMatch.group(1)
529 dbsetup = path.abspath(path.join("DBRelease", dbversion, "setup.py"))
530 if os.access(dbsetup, os.R_OK):
531 msg.debug('DBRelease {0} is already unpacked, found {1}'.format(tarball, dbsetup))
532 return False, dbsetup
533 else:
534 msg.debug('Will attempt to unpack DBRelease {0}'.format(tarball))
535 unpackTarFile(tarball)
536 msg.info('DBRelease {0} was unpacked'.format(tarball))
537 if not os.access(dbsetup, os.R_OK):
538 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_DBRELEASE_PROBLEM'),
539 'DBRelease setup file {0} was not readable, even after unpacking {1}'.format(dbsetup, tarball))
540 return True, dbsetup
541
542
545def setupDBRelease(setup):
546 try:
547 dbdir=path.abspath(path.dirname(setup))
548 msg.debug('Will add {0} to sys.path to load DBRelease setup module'.format(dbdir))
549 # N.B. We cannot use __import__ because the X.Y.Z directory name is illegal for a python module path
550 opath = sys.path
551 sys.path.insert(0, dbdir)
552 from setup import Setup
553 # Instansiate the Setup module, which activates the customisation
554 Setup(dbdir)
555 sys.path = opath
556 msg.debug('DBRelease setup module was initialised successfully')
557 except ImportError as e:
558 errMsg = 'Import error while trying to load DB Setup module: {0}'.format(e)
559 msg.error(errMsg)
560 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_DBRELEASE_PROBLEM'), errMsg)
561 except Exception as e:
562 errMsg = 'Unexpected error while trying to load DB Setup module: {0}'.format(e)
563 msg.error(errMsg)
564 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_DBRELEASE_PROBLEM'), errMsg)
565
566
567
571def cvmfsDBReleaseCheck(dbrelease):
572 dbsetup = None
573 dbdMatch = re.match(r'([\d\.]+|current)$', dbrelease)
574 msg.debug('Attempting to setup DBRelease {0} from cvmfs'.format(dbrelease))
575 if dbdMatch:
576 if 'VO_ATLAS_SW_DIR' in os.environ:
577 msg.debug('Found site defined path to ATLAS software: {0}'.format(os.environ['VO_ATLAS_SW_DIR']))
578 dbsetup = path.join(os.environ['VO_ATLAS_SW_DIR'], 'database', 'DBRelease', dbrelease, 'setup.py')
579 if os.access(dbsetup, os.R_OK):
580 return dbsetup
581 msg.warning('Site defined path to ATLAS software seems invalid (failed to access {0}). Will also try standard cvmfs path.'.format(dbsetup))
582 else:
583 msg.debug('Using standard CVMFS path to ATLAS software')
584
585 dbsetup = path.join('/cvmfs/atlas.cern.ch/repo/sw/database/DBRelease', dbrelease, 'setup.py')
586 if not os.access(dbsetup, os.R_OK):
587 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_DBRELEASE_PROBLEM'),
588 'CVMFS DBRelease setup file {0} was not readable'.format(dbsetup))
589 msg.debug('Using cvmfs based dbrelease: {0}'.format(path.dirname(dbsetup)))
590 else:
591 raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_DBRELEASE_PROBLEM'),
592 'Unable to interpret DBRelease "{0}" as either a tarball or a CVMFS release directory'.format(dbrelease))
593 return dbsetup
594
595
596
600def pickledDump(argdict):
601 if 'dumpPickle' not in argdict:
602 return
603
604 from PyJobTransforms.trfArgClasses import argument
605 theArgumentDictionary = {}
606 for k, v in argdict.items():
607 if k == 'dumpPickle':
608 continue
609 if isinstance(v, argument):
610 theArgumentDictionary[k] = getattr(v, "dumpvalue", v.value)
611 else:
612 theArgumentDictionary[k] = v
613 with open(argdict['dumpPickle'], 'wb') as pickleFile:
614 import pickle as pickle
615 pickle.dump(theArgumentDictionary, pickleFile)
616
617
618
619def JSONDump(argdict):
620 if 'dumpJSON' not in argdict:
621 return
622
623 from PyJobTransforms.trfArgClasses import argument
624 theArgumentDictionary = {}
625 for k, v in argdict.items():
626 if k == 'dumpJSON':
627 continue
628 if isinstance(v, argument):
629 theArgumentDictionary[k] = getattr(v, "dumpvalue", v.value)
630 else:
631 theArgumentDictionary[k] = v
632 with open(argdict['dumpJSON'], 'w') as JSONFile:
633 import json
634 json.dump(theArgumentDictionary, JSONFile, sort_keys=True, indent=2)
635
636
638def convertToStr(in_string):
639 if isinstance(in_string, dict):
640 return dict([(convertToStr(key), convertToStr(value)) for key, value in in_string.items()])
641 elif isinstance(in_string, list):
642 return [convertToStr(element) for element in in_string]
643 # Unicode is always str in Python3, but bytes are not
644 # TODO: remove unicode comparison after Python 3 migration
645 elif in_string.__class__.__name__ == 'unicode':
646 return in_string.encode('utf-8')
647 elif in_string.__class__.__name__ == 'bytes':
648 return in_string.decode('utf-8')
649 else:
650 return in_string
651
652
653
654def cliToKey(option):
655 return option.lstrip('-').replace('-', '_')
656
657
658
662def printHR(the_object):
663 # dictionary
664 if isinstance(the_object, dict):
665 for key, value in sorted(the_object.items()):
666 print(u'{key}: {value}'.format(key = key, value = value))
667 # list or tuple
668 elif isinstance(the_object, (list, tuple)):
669 for element in the_object:
670 print(element)
671 # other
672 else:
673 print(the_object)
674
675
676
681 return str(base64.urlsafe_b64encode(uuid.uuid4().bytes).strip("="))
682
683
684
694 quantity = None,
695 unitSingular = "unit",
696 unitPlural = "units"
697 ):
698 if quantity == 1:
699 return unitSingular
700 else:
701 return unitPlural
702
703
704# @brief returns if the current job is running in interactive environment.
706 isInteractiveEnv = False
707 # PS1 is for sh, bash; prompt is for tcsh and zsh
708 if 'PS1' in os.environ or 'prompt' in os.environ:
709 isInteractiveEnv = True
710 elif os.isatty(sys.stdout.fileno()) or os.isatty(sys.stdin.fileno()):
711 isInteractiveEnv = True
712
713 return isInteractiveEnv
714
715
716
725class Job(object):
726
727
729 self,
730 workFunction = None,
731 workFunctionKeywordArguments = {},
732 workFunctionTimeout = None,
733 name = None,
734 ):
735 self.workFunction = workFunction
736 self.workFunctionKeywordArguments = workFunctionKeywordArguments
737 self.workFunctionTimeout = workFunctionTimeout
738 self.className = self.__class__.__name__
739 self.resultGetter = None
740 if name is None:
742 else:
743 self._name = name
744 if self.workFunction is None:
745 exceptionMessage = "work function not specified"
746 msg.error("{notifier}: exception message: {exceptionMessage}".format(
747 notifier = self.className,
748 exceptionMessage = exceptionMessage
749 ))
751 trfExit.nameToCode('TRF_INTERNAL'),
752 exceptionMessage
753 )
754
755 @property
756 def name(self):
757 return self._name
758
759
763 def __str__(self):
764 descriptionString = ""
765 for key, value in sorted(vars(self).items()):
766 descriptionString += str("{key}:{value} ".format(
767 key = key,
768 value = value)
769 )
770 return descriptionString
771
772
775 def printout(self):
776 printHR(vars(self))
777
778
779
789class JobGroup(object):
790
791
793 self,
794 jobs = None,
795 name = None,
796 timeout = None
797 ):
798 self.jobs = jobs
799 self.className = self.__class__.__name__
800 self.completeStatus = False
802 if name is None:
804 else:
805 self._name = name
806 #self.timeStampSubmissionComplete = None #delete
807 if timeout is None:
808 self.timeout = 0
809 for job in self.jobs:
810 self.timeout += job.workFunctionTimeout
811 self.results = []
812
813 @property
814 def name(self):
815 return self._name
816
817
821 def __str__(self):
822 descriptionString = ""
823 for key, value in sorted(vars(self).items()):
824 descriptionString += str("{key}:{value} ".format(
825 key = key,
826 value = value)
827 )
828 return descriptionString
829
830
837 def timeoutStatus(self):
838 # If the JobGroup is complete or not submitted, then it is not timed
839 # out.
840 if self.completeStatus is True or self.timeStampSubmission is None:
841 return False
842 # If the JobGroup is not complete or submitted, then it may be timed
843 # out.
844 elif time.time() > self.timeout + self.timeStampSubmission:
845 return True
846 else:
847 return False
848
849
852 def printout(self):
853 printHR(vars(self))
854
855
856
858 # Multiprocessing uses signals to communicate with subprocesses, so the
859 # following two lines prevent the transforms signal handlers from
860 # interfering:
861 from PyJobTransforms.trfSignal import resetTrfSignalHandlers
862 resetTrfSignalHandlers()
863 signal.signal(signal.SIGINT, signal.SIG_IGN)
864
865
866
870
871
876 self,
877 jobSubmission = None,
878 numberOfProcesses = multiprocessing.cpu_count(), # noqa: B008 (cpu_count is constant)
879 # name of start method used for starting processes,
880 # 'fork', 'spawn', 'forkserver'
881 # "fork" is default prior to Python 3.14, planned to change in Python 3.14
882 startMethod="fork"
883 ):
884 self.jobSubmission = jobSubmission
885 self.numberOfProcesses = numberOfProcesses
886 self.startMethod = startMethod
887 self.className = self.__class__.__name__
888 msg.debug(f"{self.className}: current process: {multiprocessing.current_process().name}")
889 self.status = "starting"
890 msg.debug("{notifier}: status: {status}".format(
891 notifier = self.className,
892 status = self.status)
893 )
894 self.countOfJobs = None
896
897 self.pool = multiprocessing.pool.Pool(
899 initialise_processes,
900 (),
901 None,
902 multiprocessing.get_context(self.startMethod)
903 )
904 msg.debug("{notifier}: pool of {numberOfProcesses} {units} with start method {startMethod!r} created".format(
905 notifier = self.className,
906 numberOfProcesses = str(self.numberOfProcesses),
907 units = units(quantity = self.numberOfProcesses,
908 unitSingular = "process", unitPlural = "processes"),
909 startMethod = self.startMethod,
910 ))
911 self.status = "ready"
912 msg.debug("{notifier}: status: {status}".format(
913 notifier = self.className,
914 status = self.status
915 ))
916
917
921 def __str__(self):
922 descriptionString = ""
923 for key, value in sorted(vars(self).items()):
924 descriptionString += str("{key}:{value} ".format(
925 key = key,
926 value = value
927 ))
928 return descriptionString
929
930
933 def printout(self):
934 printHR(vars(self)
935 )
936
937
942 self,
943 jobSubmission = None
944 ):
945 # If the input submission is not None, then update the jobSubmission
946 # data attribute to that specified for this method.
947 if jobSubmission is not None:
948 self.jobSubmission = jobSubmission
949 self.status = "submitted"
950 msg.debug("{notifier}: status: {status}".format(
951 notifier = self.className,
952 status = self.status
953 ))
954 # If the input submission is a Job object, contain it in a JobGroup
955 # object.
956 if isinstance(self.jobSubmission, Job):
957 jobGroup = JobGroup(
958 jobs = [self.jobSubmission,],
959 )
960 self.jobSubmission = jobGroup
961 # Count the number of jobs.
962 self.countOfJobs = len(self.jobSubmission.jobs)
964 # Build a contemporary list of the names of jobs.
966 for job in self.jobSubmission.jobs:
967 self.listOfNamesOfRemainingJobs.append(job.name)
968 msg.debug("{notifier}: received job group submission '{name}' of {countOfJobs} {units}".format(
969 notifier = self.className,
970 name = self.jobSubmission.name,
971 countOfJobs = self.countOfJobs,
972 units = units(
973 quantity = self.countOfRemainingJobs,
974 unitSingular = "job",
975 unitPlural = "jobs"
976 )
977 ))
978 msg.debug(self.statusReport())
979 msg.debug("{notifier}: submitting job group submission '{name}' to pool".format(
980 notifier = self.className,
981 name = self.jobSubmission.name
982 ))
983 # Cycle through all jobs in the input submission and apply each to the
984 # pool.
985 for job in self.jobSubmission.jobs:
986 job.timeStampSubmission = time.time()
987 msg.debug("{notifier}: job '{name}' submitted to pool".format(
988 notifier = self.className,
989 name = job.name
990 ))
991 # Apply the job to the pool, applying the object
992 # multiprocessing.pool.ApplyResult (AsyncResult alias thereof)
993 # to the job as a data attribute.
994 job.resultGetter = self.pool.apply_async(
995 func = job.workFunction,
996 kwds = job.workFunctionKeywordArguments
997 )
998 # Prepare monitoring of job group times in order to detect a job group
999 # timeout by recording the time of complete submission of the job group.
1000 self.jobSubmission.timeStampSubmission = time.time()
1001 msg.debug("{notifier}: job group submission complete: {countOfJobs} {units} submitted to pool (timestamp: {timeStampSubmission})".format(
1002 notifier = self.className,
1003 countOfJobs = self.countOfJobs,
1004 units = units(
1005 quantity = self.countOfJobs,
1006 unitSingular = "job",
1007 unitPlural = "jobs"
1008 ),
1009 timeStampSubmission = self.jobSubmission.timeStampSubmission
1010 ))
1011 self.status = "processing"
1012 msg.debug("{notifier}: status: {status}".format(
1013 notifier = self.className,
1014 status = self.status
1015 ))
1016 return 0
1017
1018
1022 def getResults(self):
1023 # While the number of jobs remaining is greater than zero, cycle over
1024 # all jobs in the JobGroup object submission submission, watching for a
1025 # timeout of the JobGroup object submission. If a result has not been
1026 # retrieved for a job (i.e. the Job object does not have a result data
1027 # attribute), then check if a result is available for the job (using the
1028 # method multiprocessing.pool.AsyncResult.ready()). If a result is
1029 # available for the job, then check if the job has run successfully
1030 # (using the method multiprocessing.pool.AsyncResult.successful()). If
1031 # the job has not been successful, raise an exception, otherwise, get
1032 # the result of the job and save it to the result data attribute of the
1033 # job.
1034 msg.debug("{notifier}: checking for job {units}".format(
1035 notifier = self.className,
1036 units = units(
1037 quantity = self.countOfRemainingJobs,
1038 unitSingular = "result",
1039 unitPlural = "results")
1040 )
1041 )
1042 while self.countOfRemainingJobs > 0:
1043 # Check for timeout of the job group. If the current timestamp is
1044 # greater than the job group timeout (derived from the sum of the
1045 # set of all job timeout specifications in the job group) + the job
1046 # group submission timestamp, then raise an excepton, otherwise
1047 # cycle over all jobs.
1048 # Allow time for jobs to complete.
1049 time.sleep(5.0)
1050 if self.jobSubmission.timeoutStatus():
1051 msg.error("{notifier}: job group '{name}' timed out".format(
1052 notifier = self.className,
1053 name = self.jobSubmission.name
1054 ))
1055 self._abort()
1056 exceptionMessage = "timeout of a function in list {listOfNamesOfRemainingJobs}".format(
1057 listOfNamesOfRemainingJobs = self.listOfNamesOfRemainingJobs
1058 )
1059 msg.error("{notifier}: exception message: {exceptionMessage}".format(
1060 notifier = self.className,
1061 exceptionMessage = exceptionMessage
1062 ))
1064 trfExit.nameToCode('TRF_EXEC_TIMEOUT'),
1065 exceptionMessage
1066 )
1067 else:
1068 for job in self.jobSubmission.jobs:
1069 if not hasattr(job, 'result'):
1070 # If the result of the job is ready...
1071 if job.resultGetter.ready():
1072 msg.debug(
1073 "{notifier}: result ready for job '{name}'".format(
1074 notifier = self.className,
1075 name = job.name
1076 )
1077 )
1078 job.successStatus = job.resultGetter.successful()
1079 msg.debug(
1080 "{notifier}: job '{name}' success status: {successStatus}".format(
1081 notifier = self.className,
1082 name = job.name,
1083 successStatus = job.successStatus
1084 )
1085 )
1086 # If the job was successful, create the result data
1087 # attribute of the job and save the result to it.
1088 if job.successStatus:
1089 job.result = job.resultGetter.get()
1090 msg.debug(
1091 "{notifier}: result of job '{name}': {result}".format(
1092 notifier = self.className,
1093 name = job.name,
1094 result = job.result
1095 )
1096 )
1097 self.countOfRemainingJobs -= 1
1098 self.listOfNamesOfRemainingJobs.remove(job.name)
1099 msg.debug(
1100 "{notifier}: {countOfRemainingJobs} {units} remaining".format(
1101 notifier = self.className,
1102 countOfRemainingJobs = self.countOfRemainingJobs,
1103 units = units(
1104 quantity = self.countOfRemainingJobs,
1105 unitSingular = "job",
1106 unitPlural = "jobs"
1107 )
1108 )
1109 + f"\n names of remaining jobs: {self.listOfNamesOfRemainingJobs}"
1110 )
1111 # If the job was not successful, raise an exception
1112 # and abort processing.
1113 elif not job.successStatus:
1114 msg.error(
1115 "{notifier}: job '{name}' failed".format(
1116 notifier = self.className,
1117 name = job.name
1118 )
1119 )
1120 self._abort()
1121 exceptionMessage = "failure of function '{name}' with arguments {arguments}".format(
1122 name = job.workFunction.__name__,
1123 arguments = job.workFunctionKeywordArguments
1124 )
1125 msg.error("{notifier}: exception message: {exceptionMessage}".format(
1126 notifier = self.className,
1127 exceptionMessage = exceptionMessage
1128 ))
1130 trfExit.nameToCode('TRF_EXEC_FAIL'),
1131 exceptionMessage
1132 )
1133 # All results having been returned, create the 'results' list data
1134 # attribute of the job group and append all individual job results to
1135 # it.
1136 self.jobSubmission.timeStampComplete = time.time()
1137 self.jobSubmission.completeStatus = True
1138 msg.debug("{notifier}: all {countOfJobs} {units} complete (timestamp: {timeStampComplete})".format(
1139 notifier = self.className,
1140 countOfJobs = self.countOfJobs,
1141 units = units(
1142 quantity = self.countOfJobs,
1143 unitSingular = "job",
1144 unitPlural = "jobs"
1145 ),
1146 timeStampComplete = self.jobSubmission.timeStampComplete
1147 ))
1148 self.jobSubmission.processingTime = self.jobSubmission.timeStampComplete - self.jobSubmission.timeStampSubmission
1149 msg.debug("{notifier}: time taken to process all {countOfJobs} {units}: {processingTime}".format(
1150 notifier = self.className,
1151 countOfJobs = self.countOfJobs,
1152 units = units(
1153 quantity = self.countOfJobs,
1154 unitSingular = "job",
1155 unitPlural = "jobs"
1156 ),
1157 processingTime = self.jobSubmission.processingTime
1158 ))
1159 for job in self.jobSubmission.jobs:
1160 self.jobSubmission.results.append(job.result)
1161 self._terminate()
1162 return self.jobSubmission.results
1163
1164
1168 def statusReport(self):
1169 statusReport = "{notifier}:\n status report:".format(
1170 notifier = self.className
1171 )
1172 # information on parallel job processor
1173 statusReport += "\n parallel job processor configuration:"
1174 statusReport += "\n status: {notifier}".format(
1175 notifier = str(self.status)
1176 )
1177 statusReport += "\n number of processes: {notifier}".format(
1178 notifier = str(self.numberOfProcesses)
1179 )
1180 # information on job group submission
1181 statusReport += "\n job group submission: '{notifier}'".format(
1182 notifier = self.jobSubmission.name
1183 )
1184 statusReport += "\n total number of jobs: {notifier}".format(
1185 notifier = str(self.countOfJobs)
1186 )
1187 statusReport += "\n number of incomplete jobs: {notifier}".format(
1188 notifier = str(self.countOfRemainingJobs)
1189 )
1190 statusReport += "\n names of incomplete jobs: {notifier}".format(
1191 notifier = self.listOfNamesOfRemainingJobs
1192 )
1193 # information on jobs (if existent)
1194 if self.jobSubmission.jobs:
1195 statusReport += "\n jobs:"
1196 for job in self.jobSubmission.jobs:
1197 statusReport += "\n job '{name}':".format(
1198 name = job.name
1199 )
1200 statusReport += "\n workFunction: '{name}'".format(
1201 name = job.workFunction.__name__
1202 )
1203 statusReport += "\n workFunctionKeywordArguments: '{arguments}'".format(
1204 arguments = job.workFunctionKeywordArguments
1205 )
1206 statusReport += "\n workFunctionTimeout: '{timeout}'".format(
1207 timeout = job.workFunctionTimeout
1208 )
1209 if hasattr(job, 'result'):
1210 statusReport += "\n result: '{result}'".format(
1211 result = job.result
1212 )
1213 # statistics of parallel job processor run
1214 if hasattr(self.jobSubmission, 'processingTime'):
1215 statusReport += "\n statistics:"
1216 if hasattr(self.jobSubmission, 'processingTime'):
1217 statusReport += "\n total processing time: {processingTime} s".format(
1218 processingTime = self.jobSubmission.processingTime
1219 )
1220 return statusReport
1221
1222
1225 def _abort(self):
1226 self.status = "aborting"
1227 msg.debug("{notifier}: status: {status}".format(
1228 notifier = self.className,
1229 status = self.status
1230 ))
1231 self._terminate()
1232
1233
1238 def _terminate(self):
1239 self.status = "terminating"
1240 msg.debug("{notifier}: status: {status}".format(
1241 notifier = self.className,
1242 status = self.status
1243 ))
1244 msg.debug("{notifier}: terminating pool of {numberOfProcesses} {units}".format(
1245 notifier = self.className,
1246 numberOfProcesses = str(self.numberOfProcesses),
1247 units = units(
1248 quantity = self.numberOfProcesses,
1249 unitSingular = "process",
1250 unitPlural = "processes"
1251 )
1252 ))
1253 self.pool.terminate()
1254 self.pool.join()
1255 self.status = "finished"
1256 msg.debug("{notifier}: status: {status}".format(
1257 notifier = self.className,
1258 status = self.status
1259 ))
1260 msg.debug(self.statusReport())
1261
1262
1263class analytic():
1264
1265 _fit = None
1266
1267 def __init__(self, **kwargs):
1268 self._fit = None
1269
1270
1275 def fit(self, x, y, model='linear'):
1276 try:
1277 self._fit = Fit(x=x, y=y, model=model)
1278 except Exception as e:
1279 msg.warning('fit failed! {0}'.format(e))
1280
1281 return self._fit
1282
1283 # Return the slope of a linear fit, y(x) = slope * x + intersect
1284 def slope(self):
1285 slope = None
1286
1287 if self._fit:
1288 slope = self._fit.slope()
1289 else:
1290 msg.warning('Fit has not been defined')
1291
1292 return slope
1293
1294 # Return a properly formatted job metrics string with analytics data.
1295 # Currently the function returns a fit for 'pss' vs 'time', whose slope measures memory leaks.
1296 # @param filename: memory monitor output file (string).
1297 # @param x_name: optional string, name selector for table column.
1298 # @param y_name: optional string, name selector for table column.
1299 # @param precision: optional precision for fitted slope parameter, default 2.
1300 # @param tails: should tails be used? (boolean).
1301 # @param minPoints: minimun desired points of data to be fitted (after removing tail)
1302 # @return: {"slope": slope, "chi2": chi2}
1303 def getFittedData(self, filename, x_name='Time', y_name='pss', precision=2, tails=False, minPoints=5):
1304 _memFileToTable = memFileToTable()
1305 fitResult = {}
1306 table = _memFileToTable.getTable(filename, header=None, separator="\t")
1307 if table:
1308 # extract data to be fitted
1309 x, y = self.extractFromTable(table, x_name, y_name)
1310 # remove tails if desired
1311 # this is useful e.g. for memory monitor data where the first and last values
1312 # represent allocation and de-allocation, ie not interesting
1313 # here tail is defined to be first and last 20% of data
1314 if not tails:
1315 tail = int(len(x)/5)
1316 msg.info('removing tails from the memory monitor data; 20% from each side')
1317 x = x[tail:]
1318 x = x[:-tail]
1319 y = y[tail:]
1320 y = y[:-tail]
1321
1322 if len(x)==len(y) and len(x) > minPoints:
1323 msg.info('fitting {0} vs {1}'.format(y_name, x_name))
1324 try:
1325 fit = self.fit(x, y)
1326 _slope = self.slope()
1327 except Exception as e:
1328 msg.warning('failed to fit data, x={0}, y={1}: {2}'.format(x, y, e))
1329 else:
1330 if _slope:
1331 slope = round(fit.slope(), precision)
1332 chi2 = round(fit.chi2(), precision)
1333 fitResult = {"slope": slope, "chi2": chi2}
1334 if slope:
1335 HRslope, unit = self.formatBytes(slope)
1336 msg.info('slope of the fitted line: {0} {1} (using {2} data points, chi2={3})'.format(HRslope, unit, len(x), chi2))
1337 else:
1338 msg.warning('wrong length of table data, x={0}, y={1} (must be same and length>={2})'.format(x, y, minPoints))
1339
1340 return fitResult
1341
1342 # Extract wanted columns. e.g. x: Time , y: pss+swap
1343 # @param x_name: column name to be extracted (string).
1344 # @param y_name: column name to be extracted (may contain '+'-sign) (string).
1345 # @return: x (list), y (list).
1346 def extractFromTable(self, table, x_name, y_name):
1347 headerUpperVersion = {'pss':'PSS', 'swap':'Swap', 'rss':'RSS', 'vmem':'VMEM'}
1348 x = table.get(x_name, [])
1349 if '+' not in y_name:
1350 y = table.get(y_name, [])
1351 if len(y)==0:
1352 y = table.get(headerUpperVersion[y_name], [])
1353 else:
1354 try:
1355 y1_name = y_name.split('+')[0]
1356 y2_name = y_name.split('+')[1]
1357 y1_value = table.get(y1_name, [])
1358 y2_value = table.get(y2_name, [])
1359 if len(y1_value)==0 or len(y2_value)==0:
1360 y1_value = table.get(headerUpperVersion[y1_name], [])
1361 y2_value = table.get(headerUpperVersion[y2_name], [])
1362 except Exception as e:
1363 msg.warning('exception caught: {0}'.format(e))
1364 x = []
1365 y = []
1366 else:
1367 # create new list with added values (1,2,3) + (4,5,6) = (5,7,9)
1368 y = [x0 + y0 for x0, y0 in zip(y1_value, y2_value)]
1369
1370 return x, y
1371
1372 # Make the result of slope human readable (HR)
1373 # default unit is KB
1374 def formatBytes(self, size):
1375 # decimal system
1376 power = 1000
1377 n = 1
1378 power_labels = {1: 'K', 2: 'M', 3: 'G', 4: 'T'}
1379 while size > power:
1380 size /= power
1381 n += 1
1382 return round(size, 2), power_labels[n]+'B/s'
1383
1384
1385
1386class Fit():
1387 _model = 'linear' # fitting model
1388 _x = None # x values
1389 _y = None # y values
1390 _xm = None # x mean
1391 _ym = None # y mean
1392 _ss = None # sum of square deviations
1393 _ss2 = None # sum of deviations
1394 _slope = None # slope
1395 _intersect = None # intersect
1396 _chi2 = None # chi2
1397
1398 def __init__(self, **kwargs):
1399 # extract parameters
1400 self._model = kwargs.get('model', 'linear')
1401 self._x = kwargs.get('x', None)
1402 self._y = kwargs.get('y', None)
1403 self._math = math()
1404
1405 if not self._x or not self._y:
1406 msg.warning('input data not defined')
1407
1408 if len(self._x) != len(self._y):
1409 msg.warning('input data (lists) have different lengths')
1410
1411 # base calculations
1412 if self._model == 'linear':
1413 self._ss = self._math.sum_square_dev(self._x)
1414 self._ss2 = self._math.sum_dev(self._x, self._y)
1415 self.set_slope()
1416 self._xm = self._math.mean(self._x)
1417 self._ym = self._math.mean(self._y)
1418 self.set_intersect()
1419 self.set_chi2()
1420
1421 else:
1422 msg.warning("\'{0}\' model is not implemented".format(self._model))
1423
1424 def fit(self):
1425 #Return fitting object.
1426 return self
1427
1428 def value(self, t):
1429 #Return the value y(x=t) of a linear fit y(x) = slope * x + intersect.
1430 return self._slope * t + self._intersect
1431
1432 def set_chi2(self):
1433 #Calculate and set the chi2 value.
1434 y_observed = self._y
1435 y_expected = []
1436 for x in self._x:
1437 y_expected.append(self.value(x))
1438 if y_observed and y_observed != [] and y_expected and y_expected != []:
1439 self._chi2 = self._math.chi2(y_observed, y_expected)
1440 else:
1441 self._chi2 = None
1442
1443 def chi2(self):
1444 #Return the chi2 value.
1445 return self._chi2
1446
1447 def set_slope(self):
1448 #Calculate and set the slope of the linear fit.
1449 if self._ss2 and self._ss and self._ss != 0:
1450 self._slope = self._ss2 / self._ss
1451 else:
1452 self._slope = None
1453
1454 def slope(self):
1455 #Return the slope value.
1456 return self._slope
1457
1458 def set_intersect(self):
1459 #Calculate and set the intersect of the linear fit.
1460 if self._ym and self._slope and self._xm:
1461 self._intersect = self._ym - self._slope * self._xm
1462 else:
1463 self._intersect = None
1464
1465 def intersect(self):
1466 #Return the intersect value.
1467 return self._intersect
1468
1469
1470
1471class math():
1472
1473 #Return the sample arithmetic mean of data.
1474 def mean(self, data):
1475 n = len(data)
1476 if n < 1:
1477 msg.warning('mean requires at least one data point')
1478 return sum(data)/n
1479
1480 # Return sum of square deviations of sequence data.
1481 # Sum (x - x_mean)**2
1482 def sum_square_dev(self, data):
1483 c = self.mean(data)
1484 return sum((x - c) ** 2 for x in data)
1485
1486 # Return sum of deviations of sequence data.
1487 # Sum (x - x_mean)*(y - y_mean)
1488 def sum_dev(self, x, y):
1489 c1 = self.mean(x)
1490 c2 = self.mean(y)
1491 return sum((_x - c1) * (_y - c2) for _x, _y in zip(x, y))
1492
1493 # Return the chi2 sum of the provided observed and expected values.
1494 def chi2(self, observed, expected):
1495 if 0 in expected:
1496 return 0.0
1497 return sum((_o - _e) ** 2 / _e for _o, _e in zip(observed, expected))
1498
1499
1500
1512
1513 def getTable(self, filename, header=None, separator="\t"):
1514 tabledict = {}
1515 keylist = []
1516 try:
1517 f = open(filename, 'r')
1518 except Exception as e:
1519 msg.warning("failed to open file: {0}, {1}".format(filename, e))
1520 else:
1521 firstline = True
1522 for line in f:
1523 fields = line.split(separator)
1524 fields = [''.join(filter(str.isprintable, field)) for field in fields]
1525 if firstline:
1526 firstline = False
1527 tabledict, keylist = self._defineTableDictKeys(header, fields, separator)
1528 if not header:
1529 continue
1530 # from now on, fill the dictionary fields with the input data
1531 i = 0
1532 for field in fields:
1533 # get the corresponding dictionary key from the keylist
1534 key = keylist[i]
1535 # store the field value in the correct list
1536 tabledict[key].append(float(field))
1537 i += 1
1538 f.close()
1539
1540 return tabledict
1541
1542
1547 def _defineTableDictKeys(self, header, fields, separator):
1548 tabledict = {}
1549 keylist = []
1550
1551 if not header:
1552 # get the dictionary keys from the header of the file
1553 for key in fields:
1554 # first line defines the header, whose elements will be used as dictionary keys
1555 if key == '':
1556 continue
1557 if key.endswith('\n'):
1558 key = key[:-1]
1559 tabledict[key] = []
1560 keylist.append(key)
1561 else:
1562 # get the dictionary keys from the provided header
1563 keys = header.split(separator)
1564 for key in keys:
1565 if key == '':
1566 continue
1567 if key.endswith('\n'):
1568 key = key[:-1]
1569 tabledict[key] = []
1570 keylist.append(key)
1571
1572 return tabledict, keylist
1573
1574
1575
1589 defaultOptions = True,
1590 extraOptionsList = None,
1591 AthenaSerialisedConfigurationFile = "athenaConf.pkl",
1592 returnFormat = "string"
1593 ):
1594
1595 # Access Valgrind suppressions files by finding the paths from
1596 # environment variables. Append the files to the Valgrind suppressions
1597 # options.
1598 suppressionFilesAndCorrespondingPathEnvironmentVariables = {
1599 "etc/valgrind-root.supp": "ROOTSYS",
1600 "Gaudi.supp": "DATAPATH",
1601 "oracleDB.supp": "DATAPATH",
1602 "valgrindRTT.supp": "DATAPATH",
1603 "root.supp": "DATAPATH"
1604 }
1605 optionsList = ["valgrind"]
1606 # If default options are not suppressed, use them.
1607 if defaultOptions:
1608 optionsList.append("--num-callers=30")
1609 optionsList.append("--tool=memcheck")
1610 optionsList.append("--leak-check=full")
1611 optionsList.append("--smc-check=all")
1612 # If extra options are specified, append them to the existing options.
1613 if extraOptionsList:
1614 for option in extraOptionsList:
1615 optionsList.append(option)
1616 # Add suppression files and athena commands
1617 for suppressionFile, pathEnvironmentVariable in suppressionFilesAndCorrespondingPathEnvironmentVariables.items():
1618 suppFile = findFile(os.environ[pathEnvironmentVariable], suppressionFile)
1619 if suppFile:
1620 optionsList.append("--suppressions=" + suppFile)
1621 else:
1622 msg.warning("Bad path to suppression file: {sfile}, {path} not defined".format(
1623 sfile = suppressionFile, path = pathEnvironmentVariable)
1624 )
1625 optionsList.append("$(which python)")
1626 optionsList.append("$(which athena.py)")
1627 optionsList.append(AthenaSerialisedConfigurationFile)
1628 # Return the command in the requested format, string (by default) or list.
1629 if returnFormat is None or returnFormat == "string":
1630 return(" ".join(optionsList))
1631 elif returnFormat == "list":
1632 return(optionsList)
1633 else:
1634 print(
1635 "error: invalid Valgrind command format request (requested " +
1636 "format: {format}; valid formats: string, list)".format(
1637 format = returnFormat
1638 ))
1639 raise(Exception)
1640
1641
1655 defaultOptions = True,
1656 extraOptionsList = None,
1657 AthenaCommand = ["athena.py", "athenaConf.pkl"],
1658 returnFormat = "string"
1659 ):
1660
1661 # Access VTune suppressions files by finding the paths from
1662 # environment variables. Append the files to the VTune suppressions
1663 # options.
1664 # setsid prevents vtune killing the entire job / terminal session when it finishes
1665 optionsList = ["setsid", "vtune"]
1666 # If default options are not suppressed, use them.
1667 if defaultOptions:
1668 optionsList.append("-run-pass-thru=--no-altstack")
1669 optionsList.append("-mrte-mode=native")
1670 # If extra options are specified, append them to the existing options.
1671 isCollectSpecified=False
1672 if extraOptionsList:
1673 for option in extraOptionsList:
1674 optionsList.append(option)
1675 if option.startswith("-collect="):
1676 isCollectSpecified=True
1677 if not isCollectSpecified:
1678 optionsList.append("-collect=hotspots")
1679 optionsList.append("--")
1680 optionsList.extend(AthenaCommand)
1681 # Return the command in the requested format, string (by default) or list.
1682 if returnFormat is None or returnFormat == "string":
1683 return(" ".join(optionsList))
1684 elif returnFormat == "list":
1685 return(optionsList)
1686 else:
1687 print(
1688 "error: invalid VTune command format request (requested " +
1689 "format: {format}; valid formats: string, list)".format(
1690 format = returnFormat
1691 ))
1692 raise(Exception)
1693
1694# calculate cpuTime from os.times() times tuple
1695def calcCpuTime(start, stop):
1696 cpuTime = None
1697 if start and stop:
1698 cpuTime = reduce(lambda x1, x2: x1+x2, list(map(lambda x1, x2: x2-x1, start[2:4], stop[2:4])))
1699
1700 return cpuTime
1701
1702# calculate wallTime from os.times() times tuple
1703def calcWallTime(start, stop):
1704 wallTime = None
1705 if start and stop:
1706 wallTime = stop[4] - start[4]
1707
1708 return wallTime
1709
1710def bind_port(host, port):
1711 ret = 0
1712 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1713 try:
1714 s.bind((host, port))
1715 except socket.error as e:
1716 if e.errno == 98:
1717 print("Port %s is already in use" %port)
1718 else:
1719 # something else raised the socket.error exception
1720 print(e)
1721 ret=1
1722 s.close()
1723 return ret
1724
1725
1729
1730 # Currently the pattern which contains the information for passed events is for example like:
1731 # ISF_SimEventFilter INFO accepted 1 out of 10 events for filter ISF_SimEventFilter (SimEventFilter)
1732 # In case the filter name truncated by ... due to long timestamps, the pattern could still match
1733 # e.g. ISF_SimEventFi... or ISF_SimEventFil...
1734 regExp = re.compile(r'ISF_SimEventFi[lter|...]+\s.*INFO.*accepted\s*(?P<events>[0-9]*)\s*out of\s*(?P<total>[0-9]*).*')
1735 try:
1736 myGen = lineByLine(log)
1737 except IOError as e:
1738 msg.warning('Failed to open transform logfile {0}: {1:s}'.format(log, e))
1739
1740 resimevents = None
1741 passed_events = 0
1742 total_events = 0
1743 for line, lineCounter in myGen:
1744 m = regExp.match(line)
1745 if m:
1746 passed_events += int(m.group('events'))
1747 total_events += int(m.group('total'))
1748 resimevents = passed_events
1749
1750 if resimevents is not None:
1751 msg.info("Summary of events passed the ISF_SimEventFilter: {0} events of total {1}".format(passed_events, total_events) )
1752 else:
1753 msg.warning("Returning null value for the resimevents. No line matched with the regExp for extracting events passed the ISF_SimEventFilter")
1754
1755 return resimevents
static void reduce(HepMC::GenEvent *ge, HepMC::GenParticle *gp)
Remove an unwanted particle from the event, collapsing the graph structure consistently.
Definition FixHepMC.cxx:40
int upper(int c)
void print(char *figname, TCanvas *c1)
STL class.
Base class for execution exceptions.
Exception used by time limited executions.
Low-level fitting class.
Definition trfUtils.py:1386
__init__(self, **kwargs)
Definition trfUtils.py:1398
JobGroup: a set of Job objects and pieces of information relevant to a given set of Job objects.
Definition trfUtils.py:789
__init__(self, jobs=None, name=None, timeout=None)
initialisation method
Definition trfUtils.py:797
printout(self)
print in a human-readable way the items of the object self
Definition trfUtils.py:852
__str__(self)
return an object self description string @ detail This method returns an object description string co...
Definition trfUtils.py:821
timeoutStatus(self)
return Boolean JobGroup timeout status
Definition trfUtils.py:837
Job: a set of pieces of information relevant to a given work function.
Definition trfUtils.py:725
__init__(self, workFunction=None, workFunctionKeywordArguments={}, workFunctionTimeout=None, name=None)
initialisation method
Definition trfUtils.py:734
printout(self)
print in a human-readable way the items of the object self
Definition trfUtils.py:775
__str__(self)
return an object self description string
Definition trfUtils.py:763
ParallelJobProcessor: a multiple-process processor of Job objects.
Definition trfUtils.py:869
__init__(self, jobSubmission=None, numberOfProcesses=multiprocessing.cpu_count(), startMethod="fork")
initialisation method that accepts submissions and starts pool
Definition trfUtils.py:883
submit(self, jobSubmission=None)
submit a Job object or a JobGroup object for processing
Definition trfUtils.py:944
statusReport(self)
return a status report string
Definition trfUtils.py:1168
printout(self)
print in a human-readable way the items of the object self
Definition trfUtils.py:933
__str__(self)
return an object self-description string
Definition trfUtils.py:921
_terminate(self)
terminate parallel job processor
Definition trfUtils.py:1238
_abort(self)
abort parallel job processor
Definition trfUtils.py:1225
getResults(self)
get results of JobGroup object submission
Definition trfUtils.py:1022
Analytics service class.
Definition trfUtils.py:1263
extractFromTable(self, table, x_name, y_name)
Definition trfUtils.py:1346
__init__(self, **kwargs)
Definition trfUtils.py:1267
getFittedData(self, filename, x_name='Time', y_name='pss', precision=2, tails=False, minPoints=5)
Definition trfUtils.py:1303
fit(self, x, y, model='linear')
Fitting function For a linear model: y(x) = slope * x + intersect.
Definition trfUtils.py:1275
some mathematical tools
Definition trfUtils.py:1471
sum_square_dev(self, data)
Definition trfUtils.py:1482
chi2(self, observed, expected)
Definition trfUtils.py:1494
sum_dev(self, x, y)
Definition trfUtils.py:1488
Extract a table of data from a txt file.
Definition trfUtils.py:1511
getTable(self, filename, header=None, separator="\t")
Definition trfUtils.py:1513
_defineTableDictKeys(self, header, fields, separator)
Define the keys for the tabledict dictionary.
Definition trfUtils.py:1547
STL class.
void mean(std::vector< double > &bins, std::vector< double > &values, const std::vector< std::string > &files, const std::string &histname, const std::string &tplotname, const std::string &label="")
std::string replace(std::string s, const std::string &s2, const std::string &s3)
Definition hcg.cxx:310
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
Transform argument class definitions.
Module for transform exit codes.
Signal handling utilities for ATLAS job transforms.
releaseIsOlderThan(major, minor=None)
Test (to the best of our knowledge) if the current release is older than a major, minor version numbe...
Definition trfUtils.py:263
isodate()
Return isoformated 'now' string.
Definition trfUtils.py:436
ValgrindCommand(defaultOptions=True, extraOptionsList=None, AthenaSerialisedConfigurationFile="athenaConf.pkl", returnFormat="string")
Definition trfUtils.py:1593
shQuoteStrings(strArray=sys.argv)
Quote a string array so that it can be echoed back on the command line in a cut 'n' paste safe way.
Definition trfUtils.py:363
listChildren(psTree=None, parent=os.getpid(), listOrphans=False)
Find all the children of a particular PID (calls itself recursively to descend into each leaf)
Definition trfUtils.py:111
unpackDBRelease(tarball, dbversion=None)
Ensure that the DBRelease tarball has been unpacked.
Definition trfUtils.py:522
pickledDump(argdict)
Dump a list of arguments to the pickle file given in the 'dumpPickle' argument.
Definition trfUtils.py:600
reportEventsPassedSimFilter(log)
summarize events passed the ISF_SimEventFilter @detail this function sums up all events passed the IS...
Definition trfUtils.py:1728
cliToKey(option)
Convert a command line option to the dictionary key that will be used by argparse.
Definition trfUtils.py:654
asetupReport()
Return a string with a report of the current athena setup.
Definition trfUtils.py:225
cmpMetadata(metadata1, metadata2, guidCheck='valid')
Compare metadata for files, but taking into account that GUID can vary.
Definition trfUtils.py:466
asetupReleaseIsOlderThan(asetup_string, major, minor=None)
Test (to the best of our knowledge) if the asetup release is older than a major, minor version number...
Definition trfUtils.py:305
lineByLine(filename, strip=True, removeTimestamp=True, substepName=None)
Generator to return lines and line count from a file.
Definition trfUtils.py:374
JSONDump(argdict)
Dump a list of arguments to the JSON file given in the 'dumpJSON' argument.
Definition trfUtils.py:619
getAncestry(listMyOrphans=False)
List all processes and parents and form a dictionary where the parent key lists all child PIDs.
Definition trfUtils.py:64
uniqueIdentifier()
return a URL-safe, base 64-encoded pseudorandom UUID
Definition trfUtils.py:680
cvmfsDBReleaseCheck(dbrelease)
Validate a DBRelease exists on cvmfs and return the path to the setup script.
Definition trfUtils.py:571
VTuneCommand(defaultOptions=True, extraOptionsList=None, AthenaCommand=["athena.py", "athenaConf.pkl"], returnFormat="string")
return VTune command @detail This function returns a VTune command for use with Athena.
Definition trfUtils.py:1659
calcWallTime(start, stop)
Definition trfUtils.py:1703
forceToAlphaNum(string)
Strip a string down to alpha-numeric characters only.
Definition trfUtils.py:445
printHR(the_object)
print in a human-readable way the items of a given object
Definition trfUtils.py:662
call(args, bufsize=0, executable=None, stdin=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0, message="", logger=msg, loglevel=None, timeout=None, retry=2, timefactor=1.5, sleeptime=10)
Definition trfUtils.py:157
initialise_processes()
initisation procedure for processes of process pool
Definition trfUtils.py:857
findFile(pathvar, fname)
Find a named file along a colon separated PATH type variable.
Definition trfUtils.py:39
setupDBRelease(setup)
Run a DBRelease setup.
Definition trfUtils.py:545
convertToStr(in_string)
Recursively convert unicode to str, useful when we have just loaded something from json (TODO: make t...
Definition trfUtils.py:638
calcCpuTime(start, stop)
Definition trfUtils.py:1695
unpackTarFile(filename, directory=".")
Unpack a given tarfile.
Definition trfUtils.py:503
prettyXML(element, indent=' ', poolFileCatalogFormat=False)
XML pretty print an ElementTree.ELement object.
Definition trfUtils.py:399
units(quantity=None, unitSingular="unit", unitPlural="units")
return either singular or plural units as appropriate for a given quantity
Definition trfUtils.py:697
infanticide(childPIDs=None, sleepTime=3, message=True, listOrphans=False)
Kill all PIDs.
Definition trfUtils.py:134
bind_port(host, port)
Definition trfUtils.py:1710