ATLAS Offline Software
trfUtils.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 
3 
6 
7 import os
8 import os.path as path
9 import re
10 import signal
11 import sys
12 import tarfile
13 import time
14 import uuid
15 import socket
16 
17 import multiprocessing
18 import base64
19 
20 from datetime import datetime
21 from subprocess import Popen, STDOUT, PIPE
22 from xml.dom import minidom
23 from xml.parsers.expat import ExpatError
24 from xml.etree import ElementTree
25 
26 from PyJobTransforms.trfExitCodes import trfExit
27 import PyJobTransforms.trfExceptions as trfExceptions
28 
29 import logging
30 from functools import reduce
31 msg = logging.getLogger(__name__)
32 
33 
34 
37 def findFile(pathvar, fname):
38  # First see if the file already includes a path.
39  msg.debug('Finding full path for {fileName} in path {path}'.format(
40  fileName = fname,
41  path = pathvar
42  ))
43  if fname.startswith('/'):
44  return(fname)
45 
46  # Split the path.
47  pathElements = pathvar.split(':')
48  for pathElement in pathElements:
49  if path.exists(path.join(pathElement, fname)):
50  return(path.join(pathElement, fname))
51 
52  return(None)
53 
54 
55 
62 def getAncestry(listMyOrphans = False):
63  psCmd = ['ps', 'ax', '-o', 'pid,ppid,pgid,args', '-m']
64 
65  try:
66  msg.debug('Executing %s', psCmd)
67  p = Popen(psCmd, stdout=PIPE, stderr=PIPE)
68  stdout = p.communicate()[0]
69  psPID = p.pid
70  except OSError as e:
71  msg.error('Failed to execute "ps" to get process ancestry: %s', repr(e))
72  raise
73 
74  childDict = {}
75  myPgid = os.getpgrp()
76  myPid = os.getpid()
77  for line in stdout.decode().split('\n'):
78  try:
79  (pid, ppid, pgid, cmd) = line.split(None, 3)
80  pid = int(pid)
81  ppid = int(ppid)
82  pgid = int(pgid)
83  # Ignore the ps process
84  if pid == psPID:
85  continue
86  if ppid in childDict:
87  childDict[ppid].append(pid)
88  else:
89  childDict[ppid] = [pid]
90  if listMyOrphans and ppid == 1 and pgid == myPgid:
91  msg.info("Adding PID {0} to list of my children as it seems to be orphaned: {1}".format(pid, cmd))
92  if myPid in childDict:
93  childDict[myPid].append(pid)
94  else:
95  childDict[myPid] = [pid]
96 
97  except ValueError:
98  # Not a nice line
99  pass
100  return childDict
101 
102 
109 def listChildren(psTree = None, parent = os.getpid(), listOrphans = False): # noqa: B008 (PID is constant)
110  '''Take a psTree dictionary and list all children'''
111  if psTree is None:
112  psTree = getAncestry(listMyOrphans = listOrphans)
113 
114  msg.debug("List children of %d (%s)", parent, psTree.get(parent, []))
115  children = []
116  if parent in psTree:
117  children.extend(psTree[parent])
118  for child in psTree[parent]:
119  children.extend(listChildren(psTree, child))
120  children.reverse()
121  return children
122 
123 
124 
132 def infanticide(childPIDs = None, sleepTime = 3, message = True, listOrphans = False):
133  if childPIDs is None:
134  childPIDs = listChildren(listOrphans = listOrphans)
135 
136  if len(childPIDs) > 0 and message:
137  msg.info('Killing these child processes: {0}...'.format(childPIDs))
138 
139  for pid in childPIDs:
140  try:
141  os.kill(pid, signal.SIGTERM)
142  except OSError:
143  pass
144 
145  time.sleep(sleepTime)
146 
147  for pid in childPIDs:
148  try:
149  os.kill(pid, signal.SIGKILL)
150  except OSError:
151  # OSError happens when the process no longer exists - harmless
152  pass
153 
154 
155 def call(args, bufsize=0, executable=None, stdin=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0, message="", logger=msg, loglevel=None, timeout=None, retry=2, timefactor=1.5, sleeptime=10):
156 
157  def logProc(p):
158  line=p.stdout.readline()
159  if line:
160  line="%s%s" % (message, line.rstrip())
161  if logger is None:
162  print(line)
163  else:
164  logger.log(loglevel, line)
165 
166  def flushProc(p):
167  line=p.stdout.readline()
168  while line:
169  line="%s%s" % (message, line.strip())
170  if logger is None:
171  print(line)
172  else:
173  logger.log(loglevel, line)
174  line=p.stdout.readline()
175 
176  if loglevel is None:
177  loglevel=logging.DEBUG
178 
179  if timeout is None or timeout<=0: # no timeout set
180  msg.info('Executing %s...', args)
181  starttime = time.time()
182  p=Popen(args=args, bufsize=bufsize, executable=executable, stdin=stdin, stdout=PIPE, stderr=STDOUT, preexec_fn=preexec_fn, close_fds=close_fds, shell=shell, cwd=cwd, env=env, universal_newlines=universal_newlines, startupinfo=startupinfo, creationflags=creationflags)
183  while p.poll() is None:
184  logProc(p)
185  flushProc(p)
186  if timeout is not None:
187  msg.info('Executed call within %d s.', time.time()-starttime)
188  return p.returncode
189 
190  else: #timeout set
191  n=0
192  while n<=retry:
193  msg.info('Try %i out of %i (time limit %ss) to call %s.', n+1, retry+1, timeout, args)
194  starttime = time.time()
195  endtime=starttime+timeout
196  p=Popen(args=args, bufsize=bufsize, executable=executable, stdin=stdin, stdout=PIPE, stderr=STDOUT, preexec_fn=preexec_fn, close_fds=close_fds, shell=shell, cwd=cwd, env=env, universal_newlines=universal_newlines, startupinfo=startupinfo, creationflags=creationflags)
197  while p.poll() is None and time.time()<endtime:
198  logProc(p)
199  if p.poll() is None:
200  msg.warning('Timeout limit of %d s reached. Kill subprocess and its children.', timeout)
201  parent=p.pid
202  pids=[parent]
203  pids.extend(listChildren(parent=parent))
204  infanticide(pids)
205  msg.info('Checking if something is left in buffer.')
206  flushProc(p)
207  if n!=retry:
208  msg.info('Going to sleep for %d s.', sleeptime)
209  time.sleep(sleeptime)
210  n+=1
211  timeout*=timefactor
212  sleeptime*=timefactor
213  else:
214  flushProc(p)
215  msg.info('Executed call within %d s.', time.time()-starttime)
216  return p.returncode
217 
218  msg.warning('All %i tries failed!', n)
219  raise Exception
220 
221 
222 
224  setupMsg = str()
225  eVars = ['AtlasBaseDir', 'AtlasProject', 'AtlasVersion', 'AtlasPatch', 'AtlasPatchVersion', 'CMTCONFIG', 'TestArea']
226  if "AtlasProject" in os.environ:
227  CMake_Platform = "{0}_PLATFORM".format(os.environ["AtlasProject"])
228  if CMake_Platform in os.environ:
229  eVars.remove("CMTCONFIG")
230  eVars.append(CMake_Platform)
231  for eVar in eVars:
232  if eVar in os.environ:
233  setupMsg += '\t%s=%s\n' % (eVar, os.environ[eVar])
234  # Look for patches so that the job can be rerun
235  if 'WorkDir_DIR' in os.environ and os.access(os.environ['WorkDir_DIR'], os.R_OK):
236  pass
237  # lstags is obsolete with git releases.
238  # setupMsg += "\n\tPatch packages are:\n"
239  # try:
240  # cmd = ['lstags']
241  # lstagsOut = Popen(cmd, shell = False, stdout = PIPE, stderr = STDOUT, bufsize = 1).communicate()[0]
242  # setupMsg += "\n".join([ "\t\t{0}".format(pkg) for pkg in lstagsOut.decode().split("\n") ])
243  # except (CalledProcessError, OSError) as e:
244  # setupMsg += 'Execution of lstags failed: {0}'.format(e)
245  else:
246  setupMsg+= "No readable patch area found"
247 
248  return setupMsg.rstrip()
249 
250 
251 
261 def releaseIsOlderThan(major, minor=None):
262  if 'AtlasVersion' not in os.environ or 'AtlasBaseDir' not in os.environ:
263  msg.warning("Could not find 'AtlasVersion' and 'AtlasBaseDir' in the environment - no release match possible")
264  return False
265  try:
266  # First try AtlasVersion, which is clean
267  relRegExp = re.compile(r'(?P<major>\d+)\.(?P<minor>\d+)\.(?P<other>.*)')
268  relMatch = re.match(relRegExp, os.environ['AtlasVersion'])
269  if not relMatch:
270  # Now try the final part of AtlasBaseDir
271  leafDir = path.basename(os.environ['AtlasBaseDir'])
272  relMatch = re.match(relRegExp, leafDir)
273  if not relMatch:
274  msg.info('No identifiable numbered release found from AtlasVersion or AtlasBaseDir - assuming dev/devval/mig')
275  return False
276 
277  relmajor = int(relMatch.group('major'))
278  relminor = int(relMatch.group('minor'))
279  msg.info('Detected release major {0}, minor {1} (.{2}) from environment'.format(relmajor, relminor, relMatch.group('other')))
280 
281  # Major beats minor, so test this first
282  if relmajor < major:
283  return True
284  if relmajor > major:
285  return False
286 
287  # First case is major equality and don't care about minor
288  if minor is None or relminor >= minor:
289  return False
290  return True
291 
292  except Exception as e:
293  msg.warning('Exception thrown when attempting to detect athena version ({0}). No release check possible'.format(e))
294  return False
295 
296 
297 
303 def asetupReleaseIsOlderThan(asetup_string, major, minor=None):
304  try:
305  relmajor = None
306  relminor = None
307 
308  # First split the asetup_string by comma
309  split_string = asetup_string.split(',')
310  # master is always the newest
311  if 'master' in split_string:
312  return False
313 
314  # First try major.minor.bugfix
315  reg_exp = re.compile(r'(?P<major>\d+)\.(?P<minor>\d+)\.(?P<other>.*)')
316  for part in split_string:
317  part = part.strip()
318  match = re.match(reg_exp, part)
319  if match:
320  relmajor = int(match.group('major'))
321  relminor = int(match.group('minor'))
322  msg.info('Detected asetup release {0}.{1}(.{2})'.format(relmajor, relminor, match.group('other')))
323  break
324 
325  # Then try major.minor
326  if relmajor is None:
327  reg_exp = re.compile(r'(?P<major>\d+)\.(?P<minor>\d+)')
328  for part in split_string:
329  part = part.strip()
330  match = re.match(reg_exp, part)
331  if match:
332  relmajor = int(match.group('major'))
333  relminor = int(match.group('minor'))
334  msg.info('Detected asetup release {0}.{1}'.format(relmajor, relminor))
335  break
336 
337  # Bail out
338  if relmajor is None:
339  raise RuntimeError('asetup version could not be parsed')
340 
341  # Major beats minor, so test this first
342  if relmajor < major:
343  return True
344  if relmajor > major:
345  return False
346 
347  # First case is major equality and don't care about minor
348  if minor is None or relminor >= minor:
349  return False
350  return True
351 
352  except Exception as e:
353  msg.warning('Exception thrown when attempting to detect asetup athena version ({0}) from {1}. No release check possible'.format(e, asetup_string))
354  return False
355 
356 
357 
361 def shQuoteStrings(strArray = sys.argv):
362  return [ "'" + qstring.replace("'", "\\'") + "'" for qstring in strArray ]
363 
364 
365 
372 def lineByLine(filename, strip=True, removeTimestamp=True, substepName=None):
373  linecounter = 0
374  encargs = {'encoding' : 'utf8'}
375  f = open(filename, 'r', **encargs)
376  for line in f:
377  linecounter += 1
378  if substepName and isinstance(substepName, str): # Remove substepName only if caller provides that string.
379  line = line.lstrip(substepName)
380  if removeTimestamp:
381  line = line.lstrip('0123456789:-, ') # Remove timestamps in both serial and MP mode.
382  if strip:
383  line = line.strip()
384  yield line, linecounter
385  f.close()
386 
387 
388 
397 def prettyXML(element, indent = ' ', poolFileCatalogFormat = False):
398  # Use minidom for pretty printing
399  # See http://broadcast.oreilly.com/2010/03/pymotw-creating-xml-documents.html
400  xmlstring = ElementTree.tostring(element, 'utf-8')
401  try:
402  metadataDoc = minidom.parseString(xmlstring)
403  except ExpatError:
404  # Getting weird \x00 NULLs on the end of some GUIDs, which minidom.parsestring does not like (is this APR?)
405  msg.warning('Error parsing ElementTree string - will try removing hex literals ({0!r})'.format(xmlstring))
406  xmlstring = xmlstring.replace('\x00', '')
407  metadataDoc = minidom.parseString(xmlstring)
408 
409 
410  if poolFileCatalogFormat is False:
411  return metadataDoc.toprettyxml(indent=indent, encoding='UTF-8')
412 
413  # Now create a new document with the correct doctype for classic POOLFILECATALOG
414  # See http://stackoverflow.com/questions/2337285/set-a-dtd-using-minidom-in-python
415  imp = minidom.DOMImplementation()
416  doctype = imp.createDocumentType(qualifiedName='POOLFILECATALOG', publicId='', systemId='InMemory')
417  doc = imp.createDocument(None, 'POOLFILECATALOG', doctype)
418 
419  # Cut and paste the parsed document into the new one
420  # See http://stackoverflow.com/questions/1980380/how-to-render-a-doctype-with-pythons-xml-dom-minidom
421  refel = doc.documentElement
422  for child in metadataDoc.childNodes:
423  if child.nodeType==child.ELEMENT_NODE:
424  doc.replaceChild(doc.importNode(child, True), doc.documentElement)
425  refel= None
426  elif child.nodeType!=child.DOCUMENT_TYPE_NODE:
427  doc.insertBefore(doc.importNode(child, True), refel)
428 
429  return doc.toprettyxml(indent=indent, encoding='UTF-8')
430 
431 
432 
434 def isodate():
435  return datetime.now().replace(microsecond=0).isoformat()
436 
437 
438 
443 def forceToAlphaNum(string):
444  if string is None or string.isalnum():
445  return string
446  newstring = ''
447  for piece in string:
448  if piece.isalnum():
449  newstring += piece
450  msg.warning("String {0} was stripped to alphanumeric characters only: {1}".format(string, newstring))
451  return newstring
452 
453 
454 
464 def cmpMetadata(metadata1, metadata2, guidCheck = 'valid'):
465  # First check we have the same files
466  allFiles = set(metadata1) | set(metadata2)
467  if len(allFiles) > len(metadata1) or len(allFiles) > len(metadata2):
468  msg.warning('In metadata comparison file lists are not equal - fails ({0} != {1}'.format(metadata1, metadata2))
469  return False
470  for fname in allFiles:
471  allKeys = set(metadata1[fname]) | set(metadata2[fname])
472  if len(allKeys) > len(metadata1[fname]) or len(allFiles) > len(metadata2[fname]):
473  msg.warning('In metadata comparison key lists are not equal - fails')
474  return False
475  for key in allKeys:
476  if key == 'file_guid':
477  if guidCheck == 'ignore':
478  continue
479  elif guidCheck == 'equal':
480  if metadata1[fname]['file_guid'].upper() == metadata2[fname]['file_guid'].upper():
481  continue
482  else:
483  msg.warning('In metadata comparison strict GUID comparison failed.')
484  return False
485  elif guidCheck == 'valid':
486  try:
487  uuid.UUID(metadata1[fname]['file_guid'])
488  uuid.UUID(metadata2[fname]['file_guid'])
489  continue
490  except ValueError:
491  msg.warning('In metadata comparison found invalid GUID strings.')
492  return False
493  if metadata1[fname][key] != metadata2[fname][key]:
494  msg.warning('In metadata comparison found different key values: {0!s} != {1!s}'.format(metadata1[fname][key], metadata2[fname][key]))
495  return True
496 
497 
498 
501 def unpackTarFile(filename, directory="."):
502  try:
503  tar = tarfile.open(filename)
504  tar.extractall(path=directory)
505  tar.close()
506  except Exception as e:
507  errMsg = 'Error encountered while unpacking {0} to {1}: {2}'.format(filename, directory, e)
508  msg.error(errMsg)
509  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_SETUP'), errMsg)
510 
511 
512 
520 def unpackDBRelease(tarball, dbversion=None):
521  if dbversion is None:
522  dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(tarball))
523  if dbdMatch is None:
524  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_DBRELEASE_PROBLEM'),
525  'Could not find a valid version in the DBRelease tarball: {0}'.format(tarball))
526  dbversion = dbdMatch.group(1)
527  dbsetup = path.abspath(path.join("DBRelease", dbversion, "setup.py"))
528  if os.access(dbsetup, os.R_OK):
529  msg.debug('DBRelease {0} is already unpacked, found {1}'.format(tarball, dbsetup))
530  return False, dbsetup
531  else:
532  msg.debug('Will attempt to unpack DBRelease {0}'.format(tarball))
533  unpackTarFile(tarball)
534  msg.info('DBRelease {0} was unpacked'.format(tarball))
535  if not os.access(dbsetup, os.R_OK):
536  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_DBRELEASE_PROBLEM'),
537  'DBRelease setup file {0} was not readable, even after unpacking {1}'.format(dbsetup, tarball))
538  return True, dbsetup
539 
540 
543 def setupDBRelease(setup):
544  try:
545  dbdir=path.abspath(path.dirname(setup))
546  msg.debug('Will add {0} to sys.path to load DBRelease setup module'.format(dbdir))
547  # N.B. We cannot use __import__ because the X.Y.Z directory name is illegal for a python module path
548  opath = sys.path
549  sys.path.insert(0, dbdir)
550  from setup import Setup
551  # Instansiate the Setup module, which activates the customisation
552  Setup(dbdir)
553  sys.path = opath
554  msg.debug('DBRelease setup module was initialised successfully')
555  except ImportError as e:
556  errMsg = 'Import error while trying to load DB Setup module: {0}'.format(e)
557  msg.error(errMsg)
558  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_DBRELEASE_PROBLEM'), errMsg)
559  except Exception as e:
560  errMsg = 'Unexpected error while trying to load DB Setup module: {0}'.format(e)
561  msg.error(errMsg)
562  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_DBRELEASE_PROBLEM'), errMsg)
563 
564 
565 
569 def cvmfsDBReleaseCheck(dbrelease):
570  dbsetup = None
571  dbdMatch = re.match(r'([\d\.]+|current)$', dbrelease)
572  msg.debug('Attempting to setup DBRelease {0} from cvmfs'.format(dbrelease))
573  if dbdMatch:
574  if 'VO_ATLAS_SW_DIR' in os.environ:
575  msg.debug('Found site defined path to ATLAS software: {0}'.format(os.environ['VO_ATLAS_SW_DIR']))
576  dbsetup = path.join(os.environ['VO_ATLAS_SW_DIR'], 'database', 'DBRelease', dbrelease, 'setup.py')
577  if os.access(dbsetup, os.R_OK):
578  return dbsetup
579  msg.warning('Site defined path to ATLAS software seems invalid (failed to access {0}). Will also try standard cvmfs path.'.format(dbsetup))
580  else:
581  msg.debug('Using standard CVMFS path to ATLAS software')
582 
583  dbsetup = path.join('/cvmfs/atlas.cern.ch/repo/sw/database/DBRelease', dbrelease, 'setup.py')
584  if not os.access(dbsetup, os.R_OK):
585  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_DBRELEASE_PROBLEM'),
586  'CVMFS DBRelease setup file {0} was not readable'.format(dbsetup))
587  msg.debug('Using cvmfs based dbrelease: {0}'.format(path.dirname(dbsetup)))
588  else:
589  raise trfExceptions.TransformSetupException(trfExit.nameToCode('TRF_DBRELEASE_PROBLEM'),
590  'Unable to interpret DBRelease "{0}" as either a tarball or a CVMFS release directory'.format(dbrelease))
591  return dbsetup
592 
593 
594 
598 def pickledDump(argdict):
599  if 'dumpPickle' not in argdict:
600  return
601 
602  from PyJobTransforms.trfArgClasses import argument
603  theArgumentDictionary = {}
604  for k, v in argdict.items():
605  if k == 'dumpPickle':
606  continue
607  if isinstance(v, argument):
608  theArgumentDictionary[k] = getattr(v, "dumpvalue", v.value)
609  else:
610  theArgumentDictionary[k] = v
611  with open(argdict['dumpPickle'], 'wb') as pickleFile:
612  import pickle as pickle
613  pickle.dump(theArgumentDictionary, pickleFile)
614 
615 
616 
617 def JSONDump(argdict):
618  if 'dumpJSON' not in argdict:
619  return
620 
621  from PyJobTransforms.trfArgClasses import argument
622  theArgumentDictionary = {}
623  for k, v in argdict.items():
624  if k == 'dumpJSON':
625  continue
626  if isinstance(v, argument):
627  theArgumentDictionary[k] = getattr(v, "dumpvalue", v.value)
628  else:
629  theArgumentDictionary[k] = v
630  with open(argdict['dumpJSON'], 'w') as JSONFile:
631  import json
632  json.dump(theArgumentDictionary, JSONFile, sort_keys=True, indent=2)
633 
634 
636 def convertToStr(in_string):
637  if isinstance(in_string, dict):
638  return dict([(convertToStr(key), convertToStr(value)) for key, value in in_string.items()])
639  elif isinstance(in_string, list):
640  return [convertToStr(element) for element in in_string]
641  # Unicode is always str in Python3, but bytes are not
642  # TODO: remove unicode comparison after Python 3 migration
643  elif in_string.__class__.__name__ == 'unicode':
644  return in_string.encode('utf-8')
645  elif in_string.__class__.__name__ == 'bytes':
646  return in_string.decode('utf-8')
647  else:
648  return in_string
649 
650 
651 
652 def cliToKey(option):
653  return option.lstrip('-').replace('-', '_')
654 
655 
656 
660 def printHR(the_object):
661  # dictionary
662  if isinstance(the_object, dict):
663  for key, value in sorted(the_object.items()):
664  print(u'{key}: {value}'.format(key = key, value = value))
665  # list or tuple
666  elif isinstance(the_object, list) or isinstance(the_object, tuple):
667  for element in the_object:
668  print(element)
669  # other
670  else:
671  print(the_object)
672 
673 
674 
679  return str(base64.urlsafe_b64encode(uuid.uuid4().bytes).strip("="))
680 
681 
682 
691 def units(
692  quantity = None,
693  unitSingular = "unit",
694  unitPlural = "units"
695  ):
696  if quantity == 1:
697  return unitSingular
698  else:
699  return unitPlural
700 
701 
702 # @brief returns if the current job is running in interactive environment.
704  isInteractiveEnv = False
705  # PS1 is for sh, bash; prompt is for tcsh and zsh
706  if 'PS1' in os.environ or 'prompt' in os.environ:
707  isInteractiveEnv = True
708  elif os.isatty(sys.stdout.fileno()) or os.isatty(sys.stdin.fileno()):
709  isInteractiveEnv = True
710 
711  return isInteractiveEnv
712 
713 
714 
723 class Job(object):
724 
725 
726  def __init__(
727  self,
728  workFunction = None,
729  workFunctionKeywordArguments = {},
730  workFunctionTimeout = None,
731  name = None,
732  ):
733  self.workFunction = workFunction
734  self.workFunctionKeywordArguments = workFunctionKeywordArguments
735  self.workFunctionTimeout = workFunctionTimeout
736  self.className = self.__class__.__name__
737  self.resultGetter = None
738  if name is None:
739  self._name = uniqueIdentifier()
740  else:
741  self._name = name
742  if self.workFunction is None:
743  exceptionMessage = "work function not specified"
744  msg.error("{notifier}: exception message: {exceptionMessage}".format(
745  notifier = self.className,
746  exceptionMessage = exceptionMessage
747  ))
749  trfExit.nameToCode('TRF_INTERNAL'),
750  exceptionMessage
751  )
752 
753  @property
754  def name(self):
755  return self._name
756 
757 
761  def __str__(self):
762  descriptionString = ""
763  for key, value in sorted(vars(self).items()):
764  descriptionString += str("{key}:{value} ".format(
765  key = key,
766  value = value)
767  )
768  return descriptionString
769 
770 
773  def printout(self):
774  printHR(vars(self))
775 
776 
777 
788 
789 
790  def __init__(
791  self,
792  jobs = None,
793  name = None,
794  timeout = None
795  ):
796  self.jobs = jobs
797  self.className = self.__class__.__name__
798  self.completeStatus = False
799  self.timeStampSubmission = None
800  if name is None:
802  else:
803  self._name = name
804  #self.timeStampSubmissionComplete = None #delete
805  if timeout is None:
806  self.timeout = 0
807  for job in self.jobs:
808  self.timeout += job.workFunctionTimeout
809  self.results = []
810 
811  @property
812  def name(self):
813  return self._name
814 
815 
819  def __str__(self):
820  descriptionString = ""
821  for key, value in sorted(vars(self).items()):
822  descriptionString += str("{key}:{value} ".format(
823  key = key,
824  value = value)
825  )
826  return descriptionString
827 
828 
835  def timeoutStatus(self):
836  # If the JobGroup is complete or not submitted, then it is not timed
837  # out.
838  if self.completeStatus is True or self.timeStampSubmission is None:
839  return False
840  # If the JobGroup is not complete or submitted, then it may be timed
841  # out.
842  elif time.time() > self.timeout + self.timeStampSubmission:
843  return True
844  else:
845  return False
846 
847 
850  def printout(self):
851  printHR(vars(self))
852 
853 
854 
856  # Multiprocessing uses signals to communicate with subprocesses, so the
857  # following two lines prevent the transforms signal handlers from
858  # interfering:
859  from PyJobTransforms.trfSignal import resetTrfSignalHandlers
861  signal.signal(signal.SIGINT, signal.SIG_IGN)
862 
863 
864 
868 
869 
873  def __init__(
874  self,
875  jobSubmission = None,
876  numberOfProcesses = multiprocessing.cpu_count(), # noqa: B008 (cpu_count is constant)
877  ):
878  self.jobSubmission = jobSubmission
879  self.numberOfProcesses = numberOfProcesses
880  self.className = self.__class__.__name__
881  self.status = "starting"
882  msg.debug("{notifier}: status: {status}".format(
883  notifier = self.className,
884  status = self.status)
885  )
886  self.countOfJobs = None
887  self.countOfRemainingJobs = 0
888  self.pool = multiprocessing.Pool(
889  self.numberOfProcesses,
890  initialise_processes
891  )
892  msg.debug("{notifier}: pool of {numberOfProcesses} {units} created".format(
893  notifier = self.className,
894  numberOfProcesses = str(self.numberOfProcesses),
895  units = units(quantity = self.numberOfProcesses,
896  unitSingular = "process", unitPlural = "processes")
897  ))
898  self.status = "ready"
899  msg.debug("{notifier}: status: {status}".format(
900  notifier = self.className,
901  status = self.status
902  ))
903 
904 
908  def __str__(self):
909  descriptionString = ""
910  for key, value in sorted(vars(self).items()):
911  descriptionString += str("{key}:{value} ".format(
912  key = key,
913  value = value
914  ))
915  return descriptionString
916 
917 
920  def printout(self):
921  printHR(vars(self)
922  )
923 
924 
928  def submit(
929  self,
930  jobSubmission = None
931  ):
932  # If the input submission is not None, then update the jobSubmission
933  # data attribute to that specified for this method.
934  if jobSubmission is not None:
935  self.jobSubmission = jobSubmission
936  self.status = "submitted"
937  msg.debug("{notifier}: status: {status}".format(
938  notifier = self.className,
939  status = self.status
940  ))
941  # If the input submission is a Job object, contain it in a JobGroup
942  # object.
943  if isinstance(self.jobSubmission, Job):
944  jobGroup = JobGroup(
945  jobs = [self.jobSubmission,],
946  )
947  self.jobSubmission = jobGroup
948  # Count the number of jobs.
949  self.countOfJobs = len(self.jobSubmission.jobs)
951  # Build a contemporary list of the names of jobs.
953  for job in self.jobSubmission.jobs:
954  self.listOfNamesOfRemainingJobs.append(job.name)
955  msg.debug("{notifier}: received job group submission '{name}' of {countOfJobs} {units}".format(
956  notifier = self.className,
957  name = self.jobSubmission.name,
958  countOfJobs = self.countOfJobs,
959  units = units(
960  quantity = self.countOfRemainingJobs,
961  unitSingular = "job",
962  unitPlural = "jobs"
963  )
964  ))
965  msg.debug(self.statusReport())
966  msg.debug("{notifier}: submitting job group submission '{name}' to pool".format(
967  notifier = self.className,
968  name = self.jobSubmission.name
969  ))
970  # Cycle through all jobs in the input submission and apply each to the
971  # pool.
972  for job in self.jobSubmission.jobs:
973  job.timeStampSubmission = time.time()
974  msg.debug("{notifier}: job '{name}' submitted to pool".format(
975  notifier = self.className,
976  name = job.name
977  ))
978  # Apply the job to the pool, applying the object pool.ApplyResult
979  # to the job as a data attribute.
980  job.resultGetter = self.pool.apply_async(
981  func = job.workFunction,
982  kwds = job.workFunctionKeywordArguments
983  )
984  # Prepare monitoring of job group times in order to detect a job group
985  # timeout by recording the time of complete submission of the job group.
986  self.jobSubmission.timeStampSubmission = time.time()
987  msg.debug("{notifier}: job group submission complete: {countOfJobs} {units} submitted to pool (timestamp: {timeStampSubmission})".format(
988  notifier = self.className,
989  countOfJobs = self.countOfJobs,
990  units = units(
991  quantity = self.countOfJobs,
992  unitSingular = "job",
993  unitPlural = "jobs"
994  ),
995  timeStampSubmission = self.jobSubmission.timeStampSubmission
996  ))
997  self.status = "processing"
998  msg.debug("{notifier}: status: {status}".format(
999  notifier = self.className,
1000  status = self.status
1001  ))
1002  return 0
1003 
1004 
1008  def getResults(self):
1009  # While the number of jobs remaining is greater than zero, cycle over
1010  # all jobs in the JobGroup object submission submission, watching for a
1011  # timeout of the JobGroup object submission. If a result has not been
1012  # retrived for a job (i.e. the Job object does not have a result data
1013  # attribute), then check if a result is available for the job (using the
1014  # method multiprocessing.pool.AsyncResult.ready()). If a result is
1015  # available for the job, then check if the job has run successfully
1016  # (using the method multiprocessing.pool.AsyncResult.successful()). If
1017  # the job has not been successful, raise an exception, otherwise, get
1018  # the result of the job and save it to the result data attribute of the
1019  # job.
1020  msg.debug("{notifier}: checking for job {units}".format(
1021  notifier = self.className,
1022  units = units(
1023  quantity = self.countOfRemainingJobs,
1024  unitSingular = "result",
1025  unitPlural = "results")
1026  )
1027  )
1028  while self.countOfRemainingJobs > 0:
1029  # Check for timeout of the job group. If the current timestamp is
1030  # greater than the job group timeout (derived from the sum of the
1031  # set of all job timeout specifications in the job group) + the job
1032  # group submission timestamp, then raise an excepton, otherwise
1033  # cycle over all jobs.
1034  # Allow time for jobs to complete.
1035  time.sleep(0.25)
1036  if self.jobSubmission.timeoutStatus():
1037  msg.error("{notifier}: job group '{name}' timed out".format(
1038  notifier = self.className,
1039  name = self.jobSubmission.name
1040  ))
1041  self._abort()
1042  exceptionMessage = "timeout of a function in list {listOfNamesOfRemainingJobs}".format(
1043  listOfNamesOfRemainingJobs = self.listOfNamesOfRemainingJobs
1044  )
1045  msg.error("{notifier}: exception message: {exceptionMessage}".format(
1046  notifier = self.className,
1047  exceptionMessage = exceptionMessage
1048  ))
1050  trfExit.nameToCode('TRF_EXEC_TIMEOUT'),
1051  exceptionMessage
1052  )
1053  else:
1054  for job in self.jobSubmission.jobs:
1055  self.listOfNamesOfRemainingJobs = []
1056  if not hasattr(job, 'result'):
1057  # Maintain a contemporary list of the names of remaining
1058  # jobs.
1059  self.listOfNamesOfRemainingJobs.append(job.name)
1060  # If the result of the job is ready...
1061  if job.resultGetter.ready():
1062  msg.debug(
1063  "{notifier}: result ready for job '{name}'".format(
1064  notifier = self.className,
1065  name = job.name
1066  )
1067  )
1068  job.successStatus = job.resultGetter.successful()
1069  msg.debug(
1070  "{notifier}: job '{name}' success status: {successStatus}".format(
1071  notifier = self.className,
1072  name = job.name,
1073  successStatus = job.successStatus
1074  )
1075  )
1076  # If the job was successful, create the result data
1077  # attribute of the job and save the result to it.
1078  if job.successStatus:
1079  job.result = job.resultGetter.get()
1080  msg.debug(
1081  "{notifier}: result of job '{name}': {result}".format(
1082  notifier = self.className,
1083  name = job.name,
1084  result = job.result
1085  )
1086  )
1087  self.countOfRemainingJobs -= 1
1088  msg.debug(
1089  "{notifier}: {countOfRemainingJobs} {units} remaining".format(
1090  notifier = self.className,
1091  countOfRemainingJobs = self.countOfRemainingJobs,
1092  units = units(
1093  quantity = self.countOfRemainingJobs,
1094  unitSingular = "job",
1095  unitPlural = "jobs"
1096  )
1097  )
1098  )
1099  # If the job was not successful, raise an exception
1100  # and abort processing.
1101  elif not job.successStatus:
1102  msg.error(
1103  "{notifier}: job '{name}' failed".format(
1104  notifier = self.className,
1105  name = job.name
1106  )
1107  )
1108  self._abort()
1109  exceptionMessage = "failure of function '{name}' with arguments {arguments}".format(
1110  name = job.workFunction.__name__,
1111  arguments = job.workFunctionKeywordArguments
1112  )
1113  msg.error("{notifier}: exception message: {exceptionMessage}".format(
1114  notifier = self.className,
1115  exceptionMessage = exceptionMessage
1116  ))
1118  trfExit.nameToCode('TRF_EXEC_FAIL'),
1119  exceptionMessage
1120  )
1121  # All results having been returned, create the 'results' list data
1122  # attribute of the job group and append all individual job results to
1123  # it.
1124  self.jobSubmission.timeStampComplete = time.time()
1125  self.jobSubmission.completeStatus = True
1126  msg.debug("{notifier}: all {countOfJobs} {units} complete (timestamp: {timeStampComplete})".format(
1127  notifier = self.className,
1128  countOfJobs = self.countOfJobs,
1129  units = units(
1130  quantity = self.countOfJobs,
1131  unitSingular = "job",
1132  unitPlural = "jobs"
1133  ),
1134  timeStampComplete = self.jobSubmission.timeStampComplete
1135  ))
1136  self.jobSubmission.processingTime = self.jobSubmission.timeStampComplete - self.jobSubmission.timeStampSubmission
1137  msg.debug("{notifier}: time taken to process all {countOfJobs} {units}: {processingTime}".format(
1138  notifier = self.className,
1139  countOfJobs = self.countOfJobs,
1140  units = units(
1141  quantity = self.countOfJobs,
1142  unitSingular = "job",
1143  unitPlural = "jobs"
1144  ),
1145  processingTime = self.jobSubmission.processingTime
1146  ))
1147  for job in self.jobSubmission.jobs:
1148  self.jobSubmission.results.append(job.result)
1149  self._terminate()
1150  return self.jobSubmission.results
1151  self._terminate()
1152 
1153 
1157  def statusReport(self):
1158  statusReport = "\n{notifier}:\n status report:".format(
1159  notifier = self.className
1160  )
1161  # information on parallel job processor
1162  statusReport += "\n parallel job processor configuration:"
1163  statusReport += "\n status: {notifier}".format(
1164  notifier = str(self.status)
1165  )
1166  statusReport += "\n number of processes: {notifier}".format(
1167  notifier = str(self.numberOfProcesses)
1168  )
1169  # information on job group submission
1170  statusReport += "\n job group submission: '{notifier}'".format(
1171  notifier = self.jobSubmission.name
1172  )
1173  statusReport += "\n total number of jobs: {notifier}".format(
1174  notifier = str(self.countOfJobs)
1175  )
1176  statusReport += "\n number of incomplete jobs: {notifier}".format(
1177  notifier = str(self.countOfRemainingJobs)
1178  )
1179  statusReport += "\n names of incomplete jobs: {notifier}".format(
1180  notifier = self.listOfNamesOfRemainingJobs
1181  )
1182  # information on jobs (if existent)
1183  if self.jobSubmission.jobs:
1184  statusReport += "\n jobs:"
1185  for job in self.jobSubmission.jobs:
1186  statusReport += "\n job '{name}':".format(
1187  name = job.name
1188  )
1189  statusReport += "\n workFunction: '{name}'".format(
1190  name = job.workFunction.__name__
1191  )
1192  statusReport += "\n workFunctionKeywordArguments: '{arguments}'".format(
1193  arguments = job.workFunctionKeywordArguments
1194  )
1195  statusReport += "\n workFunctionTimeout: '{timeout}'".format(
1196  timeout = job.workFunctionTimeout
1197  )
1198  if hasattr(job, 'result'):
1199  statusReport += "\n result: '{result}'".format(
1200  result = job.result
1201  )
1202  # statistics of parallel job processor run
1203  if hasattr(self.jobSubmission, 'processingTime'):
1204  statusReport += "\n statistics:"
1205  if hasattr(self.jobSubmission, 'processingTime'):
1206  statusReport += "\n total processing time: {processingTime} s".format(
1207  processingTime = self.jobSubmission.processingTime
1208  )
1209  return statusReport
1210 
1211 
1214  def _abort(self):
1215  self.status = "aborting"
1216  msg.debug("{notifier}: status: {status}".format(
1217  notifier = self.className,
1218  status = self.status
1219  ))
1220  self._terminate()
1221 
1222 
1227  def _terminate(self):
1228  self.status = "terminating"
1229  msg.debug("{notifier}: status: {status}".format(
1230  notifier = self.className,
1231  status = self.status
1232  ))
1233  msg.debug("{notifier}: terminating pool of {numberOfProcesses} {units}".format(
1234  notifier = self.className,
1235  numberOfProcesses = str(self.numberOfProcesses),
1236  units = units(
1237  quantity = self.numberOfProcesses,
1238  unitSingular = "process",
1239  unitPlural = "processes"
1240  )
1241  ))
1242  self.pool.terminate()
1243  self.pool.join()
1244  self.status = "finished"
1245  msg.debug("{notifier}: status: {status}".format(
1246  notifier = self.className,
1247  status = self.status
1248  ))
1249  msg.debug(self.statusReport())
1250 
1251 
1252 class analytic():
1253 
1254  _fit = None
1255 
1256  def __init__(self, **kwargs):
1257  self._fit = None
1258 
1259 
1264  def fit(self, x, y, model='linear'):
1265  try:
1266  self._fit = Fit(x=x, y=y, model=model)
1267  except Exception as e:
1268  msg.warning('fit failed! {0}'.format(e))
1269 
1270  return self._fit
1271 
1272  # Return the slope of a linear fit, y(x) = slope * x + intersect
1273  def slope(self):
1274  slope = None
1275 
1276  if self._fit:
1277  slope = self._fit.slope()
1278  else:
1279  msg.warning('Fit has not been defined')
1280 
1281  return slope
1282 
1283  # Return a properly formatted job metrics string with analytics data.
1284  # Currently the function returns a fit for 'pss' vs 'time', whose slope measures memory leaks.
1285  # @param filename: memory monitor output file (string).
1286  # @param x_name: optional string, name selector for table column.
1287  # @param y_name: optional string, name selector for table column.
1288  # @param precision: optional precision for fitted slope parameter, default 2.
1289  # @param tails: should tails be used? (boolean).
1290  # @param minPoints: minimun desired points of data to be fitted (after removing tail)
1291  # @return: {"slope": slope, "chi2": chi2}
1292  def getFittedData(self, filename, x_name='Time', y_name='pss', precision=2, tails=False, minPoints=5):
1293  _memFileToTable = memFileToTable()
1294  fitResult = {}
1295  table = _memFileToTable.getTable(filename, header=None, separator="\t")
1296  if table:
1297  # extract data to be fitted
1298  x, y = self.extractFromTable(table, x_name, y_name)
1299  # remove tails if desired
1300  # this is useful e.g. for memory monitor data where the first and last values
1301  # represent allocation and de-allocation, ie not interesting
1302  # here tail is defined to be first and last 20% of data
1303  if not tails:
1304  tail = int(len(x)/5)
1305  msg.info('removing tails from the memory monitor data; 20% from each side')
1306  x = x[tail:]
1307  x = x[:-tail]
1308  y = y[tail:]
1309  y = y[:-tail]
1310 
1311  if len(x)==len(y) and len(x) > minPoints:
1312  msg.info('fitting {0} vs {1}'.format(y_name, x_name))
1313  try:
1314  fit = self.fit(x, y)
1315  _slope = self.slope()
1316  except Exception as e:
1317  msg.warning('failed to fit data, x={0}, y={1}: {2}'.format(x, y, e))
1318  else:
1319  if _slope:
1320  slope = round(fit.slope(), precision)
1321  chi2 = round(fit.chi2(), precision)
1322  fitResult = {"slope": slope, "chi2": chi2}
1323  if slope:
1324  HRslope, unit = self.formatBytes(slope)
1325  msg.info('slope of the fitted line: {0} {1} (using {2} data points, chi2={3})'.format(HRslope, unit, len(x), chi2))
1326  else:
1327  msg.warning('wrong length of table data, x={0}, y={1} (must be same and length>={2})'.format(x, y, minPoints))
1328 
1329  return fitResult
1330 
1331  # Extract wanted columns. e.g. x: Time , y: pss+swap
1332  # @param x_name: column name to be extracted (string).
1333  # @param y_name: column name to be extracted (may contain '+'-sign) (string).
1334  # @return: x (list), y (list).
1335  def extractFromTable(self, table, x_name, y_name):
1336  headerUpperVersion = {'pss':'PSS', 'swap':'Swap', 'rss':'RSS', 'vmem':'VMEM'}
1337  x = table.get(x_name, [])
1338  if '+' not in y_name:
1339  y = table.get(y_name, [])
1340  if len(y)==0:
1341  y = table.get(headerUpperVersion[y_name], [])
1342  else:
1343  try:
1344  y1_name = y_name.split('+')[0]
1345  y2_name = y_name.split('+')[1]
1346  y1_value = table.get(y1_name, [])
1347  y2_value = table.get(y2_name, [])
1348  if len(y1_value)==0 or len(y2_value)==0:
1349  y1_value = table.get(headerUpperVersion[y1_name], [])
1350  y2_value = table.get(headerUpperVersion[y2_name], [])
1351  except Exception as e:
1352  msg.warning('exception caught: {0}'.format(e))
1353  x = []
1354  y = []
1355  else:
1356  # create new list with added values (1,2,3) + (4,5,6) = (5,7,9)
1357  y = [x0 + y0 for x0, y0 in zip(y1_value, y2_value)]
1358 
1359  return x, y
1360 
1361  # Make the result of slope human readable (HR)
1362  # default unit is KB
1363  def formatBytes(self, size):
1364  # decimal system
1365  power = 1000
1366  n = 1
1367  power_labels = {1: 'K', 2: 'M', 3: 'G', 4: 'T'}
1368  while size > power:
1369  size /= power
1370  n += 1
1371  return round(size, 2), power_labels[n]+'B/s'
1372 
1373 
1374 
1375 class Fit():
1376  _model = 'linear' # fitting model
1377  _x = None # x values
1378  _y = None # y values
1379  _xm = None # x mean
1380  _ym = None # y mean
1381  _ss = None # sum of square deviations
1382  _ss2 = None # sum of deviations
1383  _slope = None # slope
1384  _intersect = None # intersect
1385  _chi2 = None # chi2
1386 
1387  def __init__(self, **kwargs):
1388  # extract parameters
1389  self._model = kwargs.get('model', 'linear')
1390  self._x = kwargs.get('x', None)
1391  self._y = kwargs.get('y', None)
1392  self._math = math()
1393 
1394  if not self._x or not self._y:
1395  msg.warning('input data not defined')
1396 
1397  if len(self._x) != len(self._y):
1398  msg.warning('input data (lists) have different lengths')
1399 
1400  # base calculations
1401  if self._model == 'linear':
1402  self._ss = self._math.sum_square_dev(self._x)
1403  self._ss2 = self._math.sum_dev(self._x, self._y)
1404  self.set_slope()
1405  self._xm = self._math.mean(self._x)
1406  self._ym = self._math.mean(self._y)
1407  self.set_intersect()
1408  self.set_chi2()
1409 
1410  else:
1411  msg.warning("\'{0}\' model is not implemented".format(self._model))
1412 
1413  def fit(self):
1414  #Return fitting object.
1415  return self
1416 
1417  def value(self, t):
1418  #Return the value y(x=t) of a linear fit y(x) = slope * x + intersect.
1419  return self._slope * t + self._intersect
1420 
1421  def set_chi2(self):
1422  #Calculate and set the chi2 value.
1423  y_observed = self._y
1424  y_expected = []
1425  for x in self._x:
1426  y_expected.append(self.value(x))
1427  if y_observed and y_observed != [] and y_expected and y_expected != []:
1428  self._chi2 = self._math.chi2(y_observed, y_expected)
1429  else:
1430  self._chi2 = None
1431 
1432  def chi2(self):
1433  #Return the chi2 value.
1434  return self._chi2
1435 
1436  def set_slope(self):
1437  #Calculate and set the slope of the linear fit.
1438  if self._ss2 and self._ss and self._ss != 0:
1439  self._slope = self._ss2 / self._ss
1440  else:
1441  self._slope = None
1442 
1443  def slope(self):
1444  #Return the slope value.
1445  return self._slope
1446 
1447  def set_intersect(self):
1448  #Calculate and set the intersect of the linear fit.
1449  if self._ym and self._slope and self._xm:
1450  self._intersect = self._ym - self._slope * self._xm
1451  else:
1452  self._intersect = None
1453 
1454  def intersect(self):
1455  #Return the intersect value.
1456  return self._intersect
1457 
1458 
1459 
1460 class math():
1461 
1462  #Return the sample arithmetic mean of data.
1463  def mean(self, data):
1464  n = len(data)
1465  if n < 1:
1466  msg.warning('mean requires at least one data point')
1467  return sum(data)/n
1468 
1469  # Return sum of square deviations of sequence data.
1470  # Sum (x - x_mean)**2
1471  def sum_square_dev(self, data):
1472  c = self.mean(data)
1473  return sum((x - c) ** 2 for x in data)
1474 
1475  # Return sum of deviations of sequence data.
1476  # Sum (x - x_mean)*(y - y_mean)
1477  def sum_dev(self, x, y):
1478  c1 = self.mean(x)
1479  c2 = self.mean(y)
1480  return sum((_x - c1) * (_y - c2) for _x, _y in zip(x, y))
1481 
1482  # Return the chi2 sum of the provided observed and expected values.
1483  def chi2(self, observed, expected):
1484  if 0 in expected:
1485  return 0.0
1486  return sum((_o - _e) ** 2 / _e for _o, _e in zip(observed, expected))
1487 
1488 
1489 
1501 
1502  def getTable(self, filename, header=None, separator="\t"):
1503  tabledict = {}
1504  keylist = []
1505  try:
1506  f = open(filename, 'r')
1507  except Exception as e:
1508  msg.warning("failed to open file: {0}, {1}".format(filename, e))
1509  else:
1510  firstline = True
1511  for line in f:
1512  fields = line.split(separator)
1513  if firstline:
1514  firstline = False
1515  tabledict, keylist = self._defineTableDictKeys(header, fields, separator)
1516  if not header:
1517  continue
1518  # from now on, fill the dictionary fields with the input data
1519  i = 0
1520  for field in fields:
1521  # get the corresponding dictionary key from the keylist
1522  key = keylist[i]
1523  # store the field value in the correct list
1524  tabledict[key].append(float(field))
1525  i += 1
1526  f.close()
1527 
1528  return tabledict
1529 
1530 
1535  def _defineTableDictKeys(self, header, fields, separator):
1536  tabledict = {}
1537  keylist = []
1538 
1539  if not header:
1540  # get the dictionary keys from the header of the file
1541  for key in fields:
1542  # first line defines the header, whose elements will be used as dictionary keys
1543  if key == '':
1544  continue
1545  if key.endswith('\n'):
1546  key = key[:-1]
1547  tabledict[key] = []
1548  keylist.append(key)
1549  else:
1550  # get the dictionary keys from the provided header
1551  keys = header.split(separator)
1552  for key in keys:
1553  if key == '':
1554  continue
1555  if key.endswith('\n'):
1556  key = key[:-1]
1557  tabledict[key] = []
1558  keylist.append(key)
1559 
1560  return tabledict, keylist
1561 
1562 
1563 
1577  defaultOptions = True,
1578  extraOptionsList = None,
1579  AthenaSerialisedConfigurationFile = "athenaConf.pkl",
1580  returnFormat = "string"
1581  ):
1582 
1583  # Access Valgrind suppressions files by finding the paths from
1584  # environment variables. Append the files to the Valgrind suppressions
1585  # options.
1586  suppressionFilesAndCorrespondingPathEnvironmentVariables = {
1587  "etc/valgrind-root.supp": "ROOTSYS",
1588  "Gaudi.supp": "DATAPATH",
1589  "oracleDB.supp": "DATAPATH",
1590  "valgrindRTT.supp": "DATAPATH",
1591  "root.supp": "DATAPATH"
1592  }
1593  optionsList = ["valgrind"]
1594  # If default options are not suppressed, use them.
1595  if defaultOptions:
1596  optionsList.append("--num-callers=30")
1597  optionsList.append("--tool=memcheck")
1598  optionsList.append("--leak-check=full")
1599  optionsList.append("--smc-check=all")
1600  # If extra options are specified, append them to the existing options.
1601  if extraOptionsList:
1602  for option in extraOptionsList:
1603  optionsList.append(option)
1604  # Add suppression files and athena commands
1605  for suppressionFile, pathEnvironmentVariable in suppressionFilesAndCorrespondingPathEnvironmentVariables.items():
1606  suppFile = findFile(os.environ[pathEnvironmentVariable], suppressionFile)
1607  if suppFile:
1608  optionsList.append("--suppressions=" + suppFile)
1609  else:
1610  msg.warning("Bad path to suppression file: {sfile}, {path} not defined".format(
1611  sfile = suppressionFile, path = pathEnvironmentVariable)
1612  )
1613  optionsList.append("$(which python)")
1614  optionsList.append("$(which athena.py)")
1615  optionsList.append(AthenaSerialisedConfigurationFile)
1616  # Return the command in the requested format, string (by default) or list.
1617  if returnFormat is None or returnFormat == "string":
1618  return(" ".join(optionsList))
1619  elif returnFormat == "list":
1620  return(optionsList)
1621  else:
1622  print(
1623  "error: invalid Valgrind command format request (requested " +
1624  "format: {format}; valid formats: string, list)".format(
1625  format = returnFormat
1626  ))
1627  raise(Exception)
1628 
1629 
1643  defaultOptions = True,
1644  extraOptionsList = None,
1645  AthenaSerialisedConfigurationFile = "athenaConf.pkl",
1646  returnFormat = "string"
1647  ):
1648 
1649  # Access VTune suppressions files by finding the paths from
1650  # environment variables. Append the files to the VTune suppressions
1651  # options.
1652  optionsList = ["vtune"]
1653  # If default options are not suppressed, use them.
1654  if defaultOptions:
1655  optionsList.append("-run-pass-thru=--no-altstack")
1656  optionsList.append("-mrte-mode=native")
1657  # If extra options are specified, append them to the existing options.
1658  isCollectSpecified=False
1659  if extraOptionsList:
1660  for option in extraOptionsList:
1661  optionsList.append(option)
1662  if option.startswith("-collect="):
1663  isCollectSpecified=True
1664  if not isCollectSpecified:
1665  optionsList.append("-collect=hotspots")
1666  optionsList.append("-- $(which python)")
1667  optionsList.append("$(which athena.py)")
1668  optionsList.append(AthenaSerialisedConfigurationFile)
1669  # Return the command in the requested format, string (by default) or list.
1670  if returnFormat is None or returnFormat == "string":
1671  return(" ".join(optionsList))
1672  elif returnFormat == "list":
1673  return(optionsList)
1674  else:
1675  print(
1676  "error: invalid VTune command format request (requested " +
1677  "format: {format}; valid formats: string, list)".format(
1678  format = returnFormat
1679  ))
1680  raise(Exception)
1681 
1682 # calculate cpuTime from os.times() times tuple
1683 def calcCpuTime(start, stop):
1684  cpuTime = None
1685  if start and stop:
1686  cpuTime = reduce(lambda x1, x2: x1+x2, list(map(lambda x1, x2: x2-x1, start[2:4], stop[2:4])))
1687 
1688  return cpuTime
1689 
1690 # calculate wallTime from os.times() times tuple
1691 def calcWallTime(start, stop):
1692  wallTime = None
1693  if start and stop:
1694  wallTime = stop[4] - start[4]
1695 
1696  return wallTime
1697 
1698 def bind_port(host, port):
1699  ret = 0
1700  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1701  try:
1702  s.bind((host, port))
1703  except socket.error as e:
1704  if e.errno == 98:
1705  print("Port %s is already in use" %port)
1706  else:
1707  # something else raised the socket.error exception
1708  print(e)
1709  ret=1
1710  s.close()
1711  return ret
1712 
1713 
1717 
1718  # Currently the pattern which contains the information for passed events is for example like:
1719  # ISF_SimEventFilter INFO accepted 1 out of 10 events for filter ISF_SimEventFilter (SimEventFilter)
1720  # In case the filter name truncated by ... due to long timestamps, the pattern could still match
1721  # e.g. ISF_SimEventFi... or ISF_SimEventFil...
1722  regExp = re.compile(r'ISF_SimEventFi[lter|...]+\s.*INFO.*accepted\s*(?P<events>[0-9]*)\s*out of\s*(?P<total>[0-9]*).*')
1723  try:
1724  myGen = lineByLine(log)
1725  except IOError as e:
1726  msg.warning('Failed to open transform logfile {0}: {1:s}'.format(log, e))
1727 
1728  resimevents = None
1729  passed_events = 0
1730  total_events = 0
1731  for line, lineCounter in myGen:
1732  m = regExp.match(line)
1733  if m:
1734  passed_events += int(m.group('events'))
1735  total_events += int(m.group('total'))
1736  resimevents = passed_events
1737 
1738  if resimevents is not None:
1739  msg.info("Summary of events passed the ISF_SimEventFilter: {0} events of total {1}".format(passed_events, total_events) )
1740  else:
1741  msg.warning("Returning null value for the resimevents. No line matched with the regExp for extracting events passed the ISF_SimEventFilter")
1742 
1743  return resimevents
PyJobTransforms.trfSignal
Signal handling utilities for ATLAS job transforms.
python.trfUtils.isInteractiveEnv
def isInteractiveEnv()
Definition: trfUtils.py:703
replace
std::string replace(std::string s, const std::string &s2, const std::string &s3)
Definition: hcg.cxx:307
python.trfUtils.bind_port
def bind_port(host, port)
Definition: trfUtils.py:1698
python.trfUtils.Job._name
_name
Definition: trfUtils.py:733
python.trfUtils.Job.printout
def printout(self)
print in a human-readable way the items of the object self
Definition: trfUtils.py:773
python.trfUtils.printHR
def printHR(the_object)
print in a human-readable way the items of a given object
Definition: trfUtils.py:660
mean
void mean(std::vector< double > &bins, std::vector< double > &values, const std::vector< std::string > &files, const std::string &histname, const std::string &tplotname, const std::string &label="")
Definition: dependence.cxx:254
python.trfUtils.Fit.slope
def slope(self)
Definition: trfUtils.py:1443
python.trfUtils.memFileToTable
Extract a table of data from a txt file.
Definition: trfUtils.py:1500
python.trfUtils.pickledDump
def pickledDump(argdict)
Dump a list of arguments to the pickle file given in the 'dumpPickle' argument.
Definition: trfUtils.py:598
python.trfExceptions.TransformSetupException
Setup exceptions.
Definition: trfExceptions.py:42
vtune_athena.format
format
Definition: vtune_athena.py:14
python.trfUtils.lineByLine
def lineByLine(filename, strip=True, removeTimestamp=True, substepName=None)
Generator to return lines and line count from a file.
Definition: trfUtils.py:372
python.trfUtils.Fit.value
def value(self, t)
Definition: trfUtils.py:1417
CaloCellPos2Ntuple.int
int
Definition: CaloCellPos2Ntuple.py:24
python.trfUtils.Job.resultGetter
resultGetter
Definition: trfUtils.py:731
python.trfUtils.ParallelJobProcessor.__str__
def __str__(self)
return an object self-description string
Definition: trfUtils.py:908
python.trfUtils.JobGroup.results
results
Definition: trfUtils.py:804
python.trfUtils.Job.__init__
def __init__(self, workFunction=None, workFunctionKeywordArguments={}, workFunctionTimeout=None, name=None)
initialisation method
Definition: trfUtils.py:726
python.trfUtils.getAncestry
def getAncestry(listMyOrphans=False)
List all processes and parents and form a dictionary where the parent key lists all child PIDs.
Definition: trfUtils.py:62
python.trfSignal.resetTrfSignalHandlers
def resetTrfSignalHandlers()
Restore signal handlers to the default ones.
Definition: trfSignal.py:40
python.trfUtils.Fit
Low-level fitting class.
Definition: trfUtils.py:1375
python.trfUtils.prettyXML
def prettyXML(element, indent=' ', poolFileCatalogFormat=False)
XML pretty print an ElementTree.ELement object.
Definition: trfUtils.py:397
python.trfUtils.calcCpuTime
def calcCpuTime(start, stop)
Definition: trfUtils.py:1683
python.trfUtils.Fit.set_chi2
def set_chi2(self)
Definition: trfUtils.py:1421
python.trfUtils.VTuneCommand
def VTuneCommand(defaultOptions=True, extraOptionsList=None, AthenaSerialisedConfigurationFile="athenaConf.pkl", returnFormat="string")
return VTune command @detail This function returns a VTune command for use with Athena.
Definition: trfUtils.py:1642
python.trfUtils.Fit.set_slope
def set_slope(self)
Definition: trfUtils.py:1436
MuonGM::round
float round(const float toRound, const unsigned int decimals)
Definition: Mdt.cxx:27
python.trfUtils.cmpMetadata
def cmpMetadata(metadata1, metadata2, guidCheck='valid')
Compare metadata for files, but taking into account that GUID can vary.
Definition: trfUtils.py:464
python.trfUtils.ValgrindCommand
def ValgrindCommand(defaultOptions=True, extraOptionsList=None, AthenaSerialisedConfigurationFile="athenaConf.pkl", returnFormat="string")
Definition: trfUtils.py:1576
python.trfUtils.JobGroup.timeoutStatus
def timeoutStatus(self)
return Boolean JobGroup timeout status
Definition: trfUtils.py:835
python.trfUtils.JobGroup.className
className
Definition: trfUtils.py:792
python.trfUtils.ParallelJobProcessor.pool
pool
Definition: trfUtils.py:884
upper
int upper(int c)
Definition: LArBadChannelParser.cxx:49
python.trfUtils.ParallelJobProcessor.status
status
Definition: trfUtils.py:877
python.trfUtils.JobGroup.printout
def printout(self)
print in a human-readable way the items of the object self
Definition: trfUtils.py:850
PyJobTransforms.trfArgClasses
Transform argument class definitions.
python.trfUtils.ParallelJobProcessor.submit
def submit(self, jobSubmission=None)
submit a Job object or a JobGroup object for processing
Definition: trfUtils.py:928
python.trfUtils.ParallelJobProcessor.jobSubmission
jobSubmission
Definition: trfUtils.py:874
PyJobTransforms.trfExitCodes
Module for transform exit codes.
dumpHVPathFromNtuple.append
bool append
Definition: dumpHVPathFromNtuple.py:91
python.trfUtils.JobGroup.name
def name(self)
Definition: trfUtils.py:812
python.trfUtils.ParallelJobProcessor.getResults
def getResults(self)
get results of JobGroup object submission
Definition: trfUtils.py:1008
python.trfUtils.reportEventsPassedSimFilter
def reportEventsPassedSimFilter(log)
summarize events passed the ISF_SimEventFilter @detail this function sums up all events passed the IS...
Definition: trfUtils.py:1716
reduce
void reduce(HepMC::GenEvent *ge, std::vector< HepMC::GenParticlePtr > toremove)
Remove unwanted particles from the event, collapsing the graph structure consistently.
Definition: FixHepMC.cxx:81
python.trfUtils.isodate
def isodate()
Return isoformated 'now' string.
Definition: trfUtils.py:434
python.trfUtils.Fit._ss2
_ss2
Definition: trfUtils.py:1382
python.trfUtils.Job.name
def name(self)
Definition: trfUtils.py:754
python.trfExceptions.TransformExecutionException
Base class for execution exceptions.
Definition: trfExceptions.py:62
python.trfUtils.Fit._xm
_xm
Definition: trfUtils.py:1379
python.trfUtils.Fit.intersect
def intersect(self)
Definition: trfUtils.py:1454
python.trfUtils.Job.__str__
def __str__(self)
return an object self description string
Definition: trfUtils.py:761
python.trfUtils.cliToKey
def cliToKey(option)
Convert a command line option to the dictionary key that will be used by argparse.
Definition: trfUtils.py:652
python.trfUtils.Fit.fit
def fit(self)
Definition: trfUtils.py:1413
python.trfUtils.Job.workFunctionKeywordArguments
workFunctionKeywordArguments
Definition: trfUtils.py:728
python.trfUtils.call
def call(args, bufsize=0, executable=None, stdin=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0, message="", logger=msg, loglevel=None, timeout=None, retry=2, timefactor=1.5, sleeptime=10)
Definition: trfUtils.py:155
python.trfUtils.setupDBRelease
def setupDBRelease(setup)
Run a DBRelease setup.
Definition: trfUtils.py:543
convertTimingResiduals.sum
sum
Definition: convertTimingResiduals.py:55
python.trfUtils.analytic._fit
_fit
Definition: trfUtils.py:1254
python.trfUtils.Fit.set_intersect
def set_intersect(self)
Definition: trfUtils.py:1447
python.trfUtils.ParallelJobProcessor
ParallelJobProcessor: a multiple-process processor of Job objects.
Definition: trfUtils.py:867
python.trfUtils.ParallelJobProcessor.printout
def printout(self)
print in a human-readable way the items of the object self
Definition: trfUtils.py:920
python.trfUtils.Fit._ss
_ss
Definition: trfUtils.py:1381
python.trfUtils.asetupReleaseIsOlderThan
def asetupReleaseIsOlderThan(asetup_string, major, minor=None)
Test (to the best of our knowledge) if the asetup release is older than a major, minor version number...
Definition: trfUtils.py:303
python.trfUtils.memFileToTable.getTable
def getTable(self, filename, header=None, separator="\t")
Definition: trfUtils.py:1502
python.trfUtils.analytic.formatBytes
def formatBytes(self, size)
Definition: trfUtils.py:1363
python.trfUtils.analytic.extractFromTable
def extractFromTable(self, table, x_name, y_name)
Definition: trfUtils.py:1335
python.trfUtils.analytic
Analytics service class.
Definition: trfUtils.py:1252
python.trfUtils.units
def units(quantity=None, unitSingular="unit", unitPlural="units")
return either singular or plural units as appropriate for a given quantity
Definition: trfUtils.py:691
PyAthena::repr
std::string repr(PyObject *o)
returns the string representation of a python object equivalent of calling repr(o) in python
Definition: PyAthenaUtils.cxx:106
python.trfUtils.convertToStr
def convertToStr(in_string)
Recursively convert unicode to str, useful when we have just loaded something from json (TODO: make t...
Definition: trfUtils.py:636
python.trfUtils.JobGroup.timeout
timeout
Definition: trfUtils.py:801
python.trfExceptions.TransformInternalException
Transform internal errors.
Definition: trfExceptions.py:74
histSizes.list
def list(name, path='/')
Definition: histSizes.py:38
python.trfUtils.Job
Job: a set of pieces of information relevant to a given work function.
Definition: trfUtils.py:723
python.trfUtils.Fit._x
_x
Definition: trfUtils.py:1377
python.trfUtils.math.mean
def mean(self, data)
Definition: trfUtils.py:1463
python.trfUtils.uniqueIdentifier
def uniqueIdentifier()
return a URL-safe, base 64-encoded pseudorandom UUID
Definition: trfUtils.py:678
DerivationFramework::TriggerMatchingUtils::sorted
std::vector< typename T::value_type > sorted(T begin, T end)
Helper function to create a sorted vector from an unsorted one.
python.trfUtils.ParallelJobProcessor.countOfJobs
countOfJobs
Definition: trfUtils.py:882
CxxUtils::set
constexpr std::enable_if_t< is_bitmask_v< E >, E & > set(E &lhs, E rhs)
Convenience function to set bits in a class enum bitmask.
Definition: bitmask.h:224
python.trfExceptions.TransformTimeoutException
Exception used by time limited executions.
Definition: trfExceptions.py:78
python.trfUtils.Fit.__init__
def __init__(self, **kwargs)
Definition: trfUtils.py:1387
python.trfUtils.JobGroup.__str__
def __str__(self)
return an object self description string @ detail This method returns an object description string co...
Definition: trfUtils.py:819
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.trfUtils.analytic.__init__
def __init__(self, **kwargs)
Definition: trfUtils.py:1256
python.trfUtils.releaseIsOlderThan
def releaseIsOlderThan(major, minor=None)
Test (to the best of our knowledge) if the current release is older than a major, minor version numbe...
Definition: trfUtils.py:261
python.trfUtils.math.chi2
def chi2(self, observed, expected)
Definition: trfUtils.py:1483
python.trfUtils.math.sum_square_dev
def sum_square_dev(self, data)
Definition: trfUtils.py:1471
python.trfUtils.JobGroup
JobGroup: a set of Job objects and pieces of information relevant to a given set of Job objects.
Definition: trfUtils.py:787
python.trfUtils.ParallelJobProcessor.statusReport
def statusReport(self)
return a status report string
Definition: trfUtils.py:1157
python.trfUtils.initialise_processes
def initialise_processes()
initisation procedure for processes of process pool
Definition: trfUtils.py:855
python.trfUtils.ParallelJobProcessor.__init__
def __init__(self, jobSubmission=None, numberOfProcesses=multiprocessing.cpu_count())
initialisation method that accepts submissions and starts pool
Definition: trfUtils.py:873
TrigJetMonitorAlgorithm.items
items
Definition: TrigJetMonitorAlgorithm.py:79
python.trfUtils.math.sum_dev
def sum_dev(self, x, y)
Definition: trfUtils.py:1477
python.trfUtils.Fit._math
_math
Definition: trfUtils.py:1392
python.trfUtils.calcWallTime
def calcWallTime(start, stop)
Definition: trfUtils.py:1691
python.trfUtils.findFile
def findFile(pathvar, fname)
Find a named file along a colon separated PATH type variable.
Definition: trfUtils.py:37
python.trfUtils.cvmfsDBReleaseCheck
def cvmfsDBReleaseCheck(dbrelease)
Validate a DBRelease exists on cvmfs and return the path to the setup script.
Definition: trfUtils.py:569
Trk::open
@ open
Definition: BinningType.h:40
python.trfUtils.ParallelJobProcessor.listOfNamesOfRemainingJobs
listOfNamesOfRemainingJobs
Definition: trfUtils.py:949
python.trfUtils.Job.className
className
Definition: trfUtils.py:730
python.trfUtils.Job.workFunction
workFunction
Definition: trfUtils.py:727
python.trfUtils.Fit._intersect
_intersect
Definition: trfUtils.py:1384
python.trfUtils.ParallelJobProcessor._terminate
def _terminate(self)
terminate parallel job processor
Definition: trfUtils.py:1227
python.trfUtils.Fit._chi2
_chi2
Definition: trfUtils.py:1385
python.trfUtils.JSONDump
def JSONDump(argdict)
Dump a list of arguments to the JSON file given in the 'dumpJSON' argument.
Definition: trfUtils.py:617
python.trfUtils.ParallelJobProcessor._abort
def _abort(self)
abort parallel job processor
Definition: trfUtils.py:1214
python.trfUtils.JobGroup.completeStatus
completeStatus
Definition: trfUtils.py:793
python.trfUtils.JobGroup.timeStampSubmission
timeStampSubmission
Definition: trfUtils.py:794
python.trfUtils.JobGroup._name
_name
Definition: trfUtils.py:796
python.trfUtils.analytic.getFittedData
def getFittedData(self, filename, x_name='Time', y_name='pss', precision=2, tails=False, minPoints=5)
Definition: trfUtils.py:1292
Muon::print
std::string print(const MuPatSegment &)
Definition: MuonTrackSteering.cxx:28
pickleTool.object
object
Definition: pickleTool.py:30
str
Definition: BTagTrackIpAccessor.cxx:11
python.trfUtils.Fit._model
_model
Definition: trfUtils.py:1376
python.trfUtils.Fit._slope
_slope
Definition: trfUtils.py:1383
python.trfUtils.analytic.slope
def slope(self)
Definition: trfUtils.py:1273
python.trfUtils.shQuoteStrings
def shQuoteStrings(strArray=sys.argv)
Quote a string array so that it can be echoed back on the command line in a cut 'n' paste safe way.
Definition: trfUtils.py:361
python.trfUtils.Fit._y
_y
Definition: trfUtils.py:1378
python.trfUtils.math
some mathematical tools
Definition: trfUtils.py:1460
python.trfUtils.JobGroup.__init__
def __init__(self, jobs=None, name=None, timeout=None)
initialisation method
Definition: trfUtils.py:790
python.trfUtils.memFileToTable._defineTableDictKeys
def _defineTableDictKeys(self, header, fields, separator)
Define the keys for the tabledict dictionary.
Definition: trfUtils.py:1535
python.trfUtils.Fit.chi2
def chi2(self)
Definition: trfUtils.py:1432
python.trfUtils.forceToAlphaNum
def forceToAlphaNum(string)
Strip a string down to alpha-numeric characters only.
Definition: trfUtils.py:443
python.trfUtils.ParallelJobProcessor.numberOfProcesses
numberOfProcesses
Definition: trfUtils.py:875
readCCLHist.float
float
Definition: readCCLHist.py:83
Trk::split
@ split
Definition: LayerMaterialProperties.h:38
python.trfUtils.unpackDBRelease
def unpackDBRelease(tarball, dbversion=None)
Ensure that the DBRelease tarball has been unpacked.
Definition: trfUtils.py:520
python.trfUtils.ParallelJobProcessor.className
className
Definition: trfUtils.py:876
python.trfUtils.asetupReport
def asetupReport()
Return a string with a report of the current athena setup.
Definition: trfUtils.py:223
python.trfUtils.listChildren
def listChildren(psTree=None, parent=os.getpid(), listOrphans=False)
Find all the children of a particular PID (calls itself recursively to descend into each leaf)
Definition: trfUtils.py:109
python.trfUtils.analytic.fit
def fit(self, x, y, model='linear')
Fitting function For a linear model: y(x) = slope * x + intersect.
Definition: trfUtils.py:1264
python.trfUtils.Fit._ym
_ym
Definition: trfUtils.py:1380
python.trfUtils.unpackTarFile
def unpackTarFile(filename, directory=".")
Unpack a given tarfile.
Definition: trfUtils.py:501
python.trfUtils.Job.workFunctionTimeout
workFunctionTimeout
Definition: trfUtils.py:729
python.trfUtils.ParallelJobProcessor.countOfRemainingJobs
countOfRemainingJobs
Definition: trfUtils.py:883
python.trfUtils.infanticide
def infanticide(childPIDs=None, sleepTime=3, message=True, listOrphans=False)
Kill all PIDs.
Definition: trfUtils.py:132
python.trfUtils.JobGroup.jobs
jobs
Definition: trfUtils.py:791