18import multiprocessing.pool
22from datetime
import datetime
23from subprocess
import Popen, STDOUT, PIPE
24from xml.dom
import minidom
25from xml.parsers.expat
import ExpatError
26from xml.etree
import ElementTree
29import PyJobTransforms.trfExceptions
as trfExceptions
32from functools
import reduce
33msg = logging.getLogger(__name__)
41 msg.debug(
'Finding full path for {fileName} in path {path}'.format(
45 if fname.startswith(
'/'):
49 pathElements = pathvar.split(
':')
50 for pathElement
in pathElements:
51 if path.exists(path.join(pathElement, fname)):
52 return(path.join(pathElement, fname))
65 psCmd = [
'ps',
'ax',
'-o',
'pid,ppid,pgid,args',
'-m']
68 msg.debug(
'Executing %s', psCmd)
69 p = Popen(psCmd, stdout=PIPE, stderr=PIPE)
70 stdout = p.communicate()[0]
73 msg.error(
'Failed to execute "ps" to get process ancestry: %s', repr(e))
79 for line
in stdout.decode().
split(
'\n'):
81 (pid, ppid, pgid, cmd) = line.split(
None, 3)
89 childDict[ppid].append(pid)
91 childDict[ppid] = [pid]
92 if listMyOrphans
and ppid == 1
and pgid == myPgid:
93 msg.info(
"Adding PID {0} to list of my children as it seems to be orphaned: {1}".format(pid, cmd))
94 if myPid
in childDict:
95 childDict[myPid].append(pid)
97 childDict[myPid] = [pid]
111def listChildren(psTree = None, parent = os.getpid(), listOrphans =
False):
112 '''Take a psTree dictionary and list all children'''
116 msg.debug(
"List children of %d (%s)", parent, psTree.get(parent, []))
119 children.extend(psTree[parent])
120 for child
in psTree[parent]:
134def infanticide(childPIDs = None, sleepTime = 3, message = True, listOrphans = False):
135 if childPIDs
is None:
138 if len(childPIDs) > 0
and message:
139 msg.info(
'Killing these child processes: {0}...'.format(childPIDs))
141 for pid
in childPIDs:
143 os.kill(pid, signal.SIGTERM)
147 time.sleep(sleepTime)
149 for pid
in childPIDs:
151 os.kill(pid, signal.SIGKILL)
157def call(args, bufsize=0, executable=None, stdin=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0, message="", logger=msg, loglevel=None, timeout=None, retry=2, timefactor=1.5, sleeptime=10):
160 line=p.stdout.readline()
162 line=
"%s%s" % (message, line.rstrip())
166 logger.log(loglevel, line)
169 line=p.stdout.readline()
171 line=
"%s%s" % (message, line.strip())
175 logger.log(loglevel, line)
176 line=p.stdout.readline()
179 loglevel=logging.DEBUG
181 if timeout
is None or timeout<=0:
182 msg.info(
'Executing %s...', args)
183 starttime = time.time()
184 p=Popen(args=args, bufsize=bufsize, executable=executable, stdin=stdin, stdout=PIPE, stderr=STDOUT, preexec_fn=preexec_fn, close_fds=close_fds, shell=shell, cwd=cwd, env=env, universal_newlines=universal_newlines, startupinfo=startupinfo, creationflags=creationflags)
185 while p.poll()
is None:
188 if timeout
is not None:
189 msg.info(
'Executed call within %d s.', time.time()-starttime)
195 msg.info(
'Try %i out of %i (time limit %ss) to call %s.', n+1, retry+1, timeout, args)
196 starttime = time.time()
197 endtime=starttime+timeout
198 p=Popen(args=args, bufsize=bufsize, executable=executable, stdin=stdin, stdout=PIPE, stderr=STDOUT, preexec_fn=preexec_fn, close_fds=close_fds, shell=shell, cwd=cwd, env=env, universal_newlines=universal_newlines, startupinfo=startupinfo, creationflags=creationflags)
199 while p.poll()
is None and time.time()<endtime:
202 msg.warning(
'Timeout limit of %d s reached. Kill subprocess and its children.', timeout)
207 msg.info(
'Checking if something is left in buffer.')
210 msg.info(
'Going to sleep for %d s.', sleeptime)
211 time.sleep(sleeptime)
214 sleeptime*=timefactor
217 msg.info(
'Executed call within %d s.', time.time()-starttime)
220 msg.warning(
'All %i tries failed!', n)
227 eVars = [
'AtlasBaseDir',
'AtlasProject',
'AtlasVersion',
'AtlasPatch',
'AtlasPatchVersion',
'CMTCONFIG',
'TestArea']
228 if "AtlasProject" in os.environ:
229 CMake_Platform =
"{0}_PLATFORM".format(os.environ[
"AtlasProject"])
230 if CMake_Platform
in os.environ:
231 eVars.remove(
"CMTCONFIG")
232 eVars.append(CMake_Platform)
234 if eVar
in os.environ:
235 setupMsg +=
'\t%s=%s\n' % (eVar, os.environ[eVar])
237 if 'WorkDir_DIR' in os.environ
and os.access(os.environ[
'WorkDir_DIR'], os.R_OK):
248 setupMsg+=
"No readable patch area found"
250 return setupMsg.rstrip()
264 if 'AtlasVersion' not in os.environ
or 'AtlasBaseDir' not in os.environ:
265 msg.warning(
"Could not find 'AtlasVersion' and 'AtlasBaseDir' in the environment - no release match possible")
269 relRegExp = re.compile(
r'(?P<major>\d+)\.(?P<minor>\d+)\.(?P<other>.*)')
270 relMatch = re.match(relRegExp, os.environ[
'AtlasVersion'])
273 leafDir = path.basename(os.environ[
'AtlasBaseDir'])
274 relMatch = re.match(relRegExp, leafDir)
276 msg.info(
'No identifiable numbered release found from AtlasVersion or AtlasBaseDir - assuming dev/devval/mig')
279 relmajor = int(relMatch.group(
'major'))
280 relminor = int(relMatch.group(
'minor'))
281 msg.info(
'Detected release major {0}, minor {1} (.{2}) from environment'.format(relmajor, relminor, relMatch.group(
'other')))
290 if minor
is None or relminor >= minor:
294 except Exception
as e:
295 msg.warning(
'Exception thrown when attempting to detect athena version ({0}). No release check possible'.format(e))
311 split_string = asetup_string.split(
',')
313 if 'master' in split_string:
317 reg_exp = re.compile(
r'(?P<major>\d+)\.(?P<minor>\d+)\.(?P<other>.*)')
318 for part
in split_string:
320 match = re.match(reg_exp, part)
322 relmajor = int(match.group(
'major'))
323 relminor = int(match.group(
'minor'))
324 msg.info(
'Detected asetup release {0}.{1}(.{2})'.format(relmajor, relminor, match.group(
'other')))
329 reg_exp = re.compile(
r'(?P<major>\d+)\.(?P<minor>\d+)')
330 for part
in split_string:
332 match = re.match(reg_exp, part)
334 relmajor = int(match.group(
'major'))
335 relminor = int(match.group(
'minor'))
336 msg.info(
'Detected asetup release {0}.{1}'.format(relmajor, relminor))
341 raise RuntimeError(
'asetup version could not be parsed')
350 if minor
is None or relminor >= minor:
354 except Exception
as e:
355 msg.warning(
'Exception thrown when attempting to detect asetup athena version ({0}) from {1}. No release check possible'.format(e, asetup_string))
364 return [
"'" + qstring.replace(
"'",
"\\'") +
"'" for qstring
in strArray ]
374def lineByLine(filename, strip=True, removeTimestamp=True, substepName=None):
376 encargs = {
'encoding' :
'utf8'}
377 f = open(filename,
'r', **encargs)
380 if substepName
and isinstance(substepName, str):
381 line = line.lstrip(substepName)
383 line = line.lstrip(
'0123456789:-, ')
386 yield line, linecounter
399def prettyXML(element, indent = ' ', poolFileCatalogFormat = False):
402 xmlstring = ElementTree.tostring(element,
'utf-8')
404 metadataDoc = minidom.parseString(xmlstring)
407 msg.warning(
'Error parsing ElementTree string - will try removing hex literals ({0!r})'.format(xmlstring))
408 xmlstring = xmlstring.replace(
'\x00',
'')
409 metadataDoc = minidom.parseString(xmlstring)
412 if poolFileCatalogFormat
is False:
413 return metadataDoc.toprettyxml(indent=indent, encoding=
'UTF-8')
417 imp = minidom.DOMImplementation()
418 doctype = imp.createDocumentType(qualifiedName=
'POOLFILECATALOG', publicId=
'', systemId=
'InMemory')
419 doc = imp.createDocument(
None,
'POOLFILECATALOG', doctype)
423 refel = doc.documentElement
424 for child
in metadataDoc.childNodes:
425 if child.nodeType==child.ELEMENT_NODE:
426 doc.replaceChild(doc.importNode(child,
True), doc.documentElement)
428 elif child.nodeType!=child.DOCUMENT_TYPE_NODE:
429 doc.insertBefore(doc.importNode(child,
True), refel)
431 return doc.toprettyxml(indent=indent, encoding=
'UTF-8')
437 return datetime.now().
replace(microsecond=0).isoformat()
446 if string
is None or string.isalnum():
452 msg.warning(
"String {0} was stripped to alphanumeric characters only: {1}".format(string, newstring))
468 allFiles =
set(metadata1) |
set(metadata2)
469 if len(allFiles) > len(metadata1)
or len(allFiles) > len(metadata2):
470 msg.warning(
'In metadata comparison file lists are not equal - fails ({0} != {1}'.format(metadata1, metadata2))
472 for fname
in allFiles:
473 allKeys =
set(metadata1[fname]) |
set(metadata2[fname])
474 if len(allKeys) > len(metadata1[fname])
or len(allFiles) > len(metadata2[fname]):
475 msg.warning(
'In metadata comparison key lists are not equal - fails')
478 if key ==
'file_guid':
479 if guidCheck ==
'ignore':
481 elif guidCheck ==
'equal':
482 if metadata1[fname][
'file_guid'].
upper() == metadata2[fname][
'file_guid'].
upper():
485 msg.warning(
'In metadata comparison strict GUID comparison failed.')
487 elif guidCheck ==
'valid':
489 uuid.UUID(metadata1[fname][
'file_guid'])
490 uuid.UUID(metadata2[fname][
'file_guid'])
493 msg.warning(
'In metadata comparison found invalid GUID strings.')
495 if metadata1[fname][key] != metadata2[fname][key]:
496 msg.warning(
'In metadata comparison found different key values: {0!s} != {1!s}'.format(metadata1[fname][key], metadata2[fname][key]))
505 tar = tarfile.open(filename)
506 tar.extractall(path=directory)
508 except Exception
as e:
509 errMsg =
'Error encountered while unpacking {0} to {1}: {2}'.format(filename, directory, e)
523 if dbversion
is None:
524 dbdMatch = re.match(
r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(tarball))
527 'Could not find a valid version in the DBRelease tarball: {0}'.format(tarball))
528 dbversion = dbdMatch.group(1)
529 dbsetup = path.abspath(path.join(
"DBRelease", dbversion,
"setup.py"))
530 if os.access(dbsetup, os.R_OK):
531 msg.debug(
'DBRelease {0} is already unpacked, found {1}'.format(tarball, dbsetup))
532 return False, dbsetup
534 msg.debug(
'Will attempt to unpack DBRelease {0}'.format(tarball))
536 msg.info(
'DBRelease {0} was unpacked'.format(tarball))
537 if not os.access(dbsetup, os.R_OK):
539 'DBRelease setup file {0} was not readable, even after unpacking {1}'.format(dbsetup, tarball))
547 dbdir=path.abspath(path.dirname(setup))
548 msg.debug(
'Will add {0} to sys.path to load DBRelease setup module'.format(dbdir))
551 sys.path.insert(0, dbdir)
552 from setup
import Setup
556 msg.debug(
'DBRelease setup module was initialised successfully')
557 except ImportError
as e:
558 errMsg =
'Import error while trying to load DB Setup module: {0}'.format(e)
561 except Exception
as e:
562 errMsg =
'Unexpected error while trying to load DB Setup module: {0}'.format(e)
573 dbdMatch = re.match(
r'([\d\.]+|current)$', dbrelease)
574 msg.debug(
'Attempting to setup DBRelease {0} from cvmfs'.format(dbrelease))
576 if 'VO_ATLAS_SW_DIR' in os.environ:
577 msg.debug(
'Found site defined path to ATLAS software: {0}'.format(os.environ[
'VO_ATLAS_SW_DIR']))
578 dbsetup = path.join(os.environ[
'VO_ATLAS_SW_DIR'],
'database',
'DBRelease', dbrelease,
'setup.py')
579 if os.access(dbsetup, os.R_OK):
581 msg.warning(
'Site defined path to ATLAS software seems invalid (failed to access {0}). Will also try standard cvmfs path.'.format(dbsetup))
583 msg.debug(
'Using standard CVMFS path to ATLAS software')
585 dbsetup = path.join(
'/cvmfs/atlas.cern.ch/repo/sw/database/DBRelease', dbrelease,
'setup.py')
586 if not os.access(dbsetup, os.R_OK):
588 'CVMFS DBRelease setup file {0} was not readable'.format(dbsetup))
589 msg.debug(
'Using cvmfs based dbrelease: {0}'.format(path.dirname(dbsetup)))
592 'Unable to interpret DBRelease "{0}" as either a tarball or a CVMFS release directory'.format(dbrelease))
601 if 'dumpPickle' not in argdict:
605 theArgumentDictionary = {}
606 for k, v
in argdict.items():
607 if k ==
'dumpPickle':
609 if isinstance(v, argument):
610 theArgumentDictionary[k] = getattr(v,
"dumpvalue", v.value)
612 theArgumentDictionary[k] = v
613 with open(argdict[
'dumpPickle'],
'wb')
as pickleFile:
614 import pickle
as pickle
615 pickle.dump(theArgumentDictionary, pickleFile)
620 if 'dumpJSON' not in argdict:
624 theArgumentDictionary = {}
625 for k, v
in argdict.items():
628 if isinstance(v, argument):
629 theArgumentDictionary[k] = getattr(v,
"dumpvalue", v.value)
631 theArgumentDictionary[k] = v
632 with open(argdict[
'dumpJSON'],
'w')
as JSONFile:
634 json.dump(theArgumentDictionary, JSONFile, sort_keys=
True, indent=2)
639 if isinstance(in_string, dict):
641 elif isinstance(in_string, list):
645 elif in_string.__class__.__name__ ==
'unicode':
646 return in_string.encode(
'utf-8')
647 elif in_string.__class__.__name__ ==
'bytes':
648 return in_string.decode(
'utf-8')
655 return option.lstrip(
'-').
replace(
'-',
'_')
664 if isinstance(the_object, dict):
665 for key, value
in sorted(the_object.items()):
666 print(
u'{key}: {value}'.format(key = key, value = value))
668 elif isinstance(the_object, (list, tuple)):
669 for element
in the_object:
681 return str(base64.urlsafe_b64encode(uuid.uuid4().bytes).
strip(
"="))
695 unitSingular = "unit",
706 isInteractiveEnv =
False
708 if 'PS1' in os.environ
or 'prompt' in os.environ:
709 isInteractiveEnv =
True
710 elif os.isatty(sys.stdout.fileno())
or os.isatty(sys.stdin.fileno()):
711 isInteractiveEnv =
True
713 return isInteractiveEnv
731 workFunctionKeywordArguments = {},
732 workFunctionTimeout = None,
745 exceptionMessage =
"work function not specified"
746 msg.error(
"{notifier}: exception message: {exceptionMessage}".format(
748 exceptionMessage = exceptionMessage
751 trfExit.nameToCode(
'TRF_INTERNAL'),
764 descriptionString =
""
765 for key, value
in sorted(vars(self).items()):
766 descriptionString += str(
"{key}:{value} ".format(
770 return descriptionString
809 for job
in self.
jobs:
810 self.
timeout += job.workFunctionTimeout
822 descriptionString =
""
823 for key, value
in sorted(vars(self).items()):
824 descriptionString += str(
"{key}:{value} ".format(
828 return descriptionString
862 resetTrfSignalHandlers()
863 signal.signal(signal.SIGINT, signal.SIG_IGN)
877 jobSubmission = None,
878 numberOfProcesses = multiprocessing.cpu_count(),
888 msg.debug(f
"{self.className}: current process: {multiprocessing.current_process().name}")
890 msg.debug(
"{notifier}: status: {status}".format(
897 self.
pool = multiprocessing.pool.Pool(
899 initialise_processes,
904 msg.debug(
"{notifier}: pool of {numberOfProcesses} {units} with start method {startMethod!r} created".format(
908 unitSingular =
"process", unitPlural =
"processes"),
912 msg.debug(
"{notifier}: status: {status}".format(
922 descriptionString =
""
923 for key, value
in sorted(vars(self).items()):
924 descriptionString += str(
"{key}:{value} ".format(
928 return descriptionString
947 if jobSubmission
is not None:
950 msg.debug(
"{notifier}: status: {status}".format(
968 msg.debug(
"{notifier}: received job group submission '{name}' of {countOfJobs} {units}".format(
974 unitSingular =
"job",
979 msg.debug(
"{notifier}: submitting job group submission '{name}' to pool".format(
986 job.timeStampSubmission = time.time()
987 msg.debug(
"{notifier}: job '{name}' submitted to pool".format(
994 job.resultGetter = self.
pool.apply_async(
995 func = job.workFunction,
996 kwds = job.workFunctionKeywordArguments
1001 msg.debug(
"{notifier}: job group submission complete: {countOfJobs} {units} submitted to pool (timestamp: {timeStampSubmission})".format(
1006 unitSingular =
"job",
1009 timeStampSubmission = self.
jobSubmission.timeStampSubmission
1011 self.
status =
"processing"
1012 msg.debug(
"{notifier}: status: {status}".format(
1034 msg.debug(
"{notifier}: checking for job {units}".format(
1038 unitSingular =
"result",
1039 unitPlural =
"results")
1051 msg.error(
"{notifier}: job group '{name}' timed out".format(
1056 exceptionMessage =
"timeout of a function in list {listOfNamesOfRemainingJobs}".format(
1059 msg.error(
"{notifier}: exception message: {exceptionMessage}".format(
1061 exceptionMessage = exceptionMessage
1064 trfExit.nameToCode(
'TRF_EXEC_TIMEOUT'),
1069 if not hasattr(job,
'result'):
1071 if job.resultGetter.ready():
1073 "{notifier}: result ready for job '{name}'".format(
1078 job.successStatus = job.resultGetter.successful()
1080 "{notifier}: job '{name}' success status: {successStatus}".format(
1083 successStatus = job.successStatus
1088 if job.successStatus:
1089 job.result = job.resultGetter.get()
1091 "{notifier}: result of job '{name}': {result}".format(
1100 "{notifier}: {countOfRemainingJobs} {units} remaining".format(
1105 unitSingular =
"job",
1109 + f
"\n names of remaining jobs: {self.listOfNamesOfRemainingJobs}"
1113 elif not job.successStatus:
1115 "{notifier}: job '{name}' failed".format(
1121 exceptionMessage =
"failure of function '{name}' with arguments {arguments}".format(
1122 name = job.workFunction.__name__,
1123 arguments = job.workFunctionKeywordArguments
1125 msg.error(
"{notifier}: exception message: {exceptionMessage}".format(
1127 exceptionMessage = exceptionMessage
1130 trfExit.nameToCode(
'TRF_EXEC_FAIL'),
1138 msg.debug(
"{notifier}: all {countOfJobs} {units} complete (timestamp: {timeStampComplete})".format(
1143 unitSingular =
"job",
1149 msg.debug(
"{notifier}: time taken to process all {countOfJobs} {units}: {processingTime}".format(
1154 unitSingular =
"job",
1169 statusReport =
"{notifier}:\n status report:".format(
1173 statusReport +=
"\n parallel job processor configuration:"
1174 statusReport +=
"\n status: {notifier}".format(
1175 notifier = str(self.
status)
1177 statusReport +=
"\n number of processes: {notifier}".format(
1181 statusReport +=
"\n job group submission: '{notifier}'".format(
1184 statusReport +=
"\n total number of jobs: {notifier}".format(
1187 statusReport +=
"\n number of incomplete jobs: {notifier}".format(
1190 statusReport +=
"\n names of incomplete jobs: {notifier}".format(
1195 statusReport +=
"\n jobs:"
1197 statusReport +=
"\n job '{name}':".format(
1200 statusReport +=
"\n workFunction: '{name}'".format(
1201 name = job.workFunction.__name__
1203 statusReport +=
"\n workFunctionKeywordArguments: '{arguments}'".format(
1204 arguments = job.workFunctionKeywordArguments
1206 statusReport +=
"\n workFunctionTimeout: '{timeout}'".format(
1207 timeout = job.workFunctionTimeout
1209 if hasattr(job,
'result'):
1210 statusReport +=
"\n result: '{result}'".format(
1215 statusReport +=
"\n statistics:"
1217 statusReport +=
"\n total processing time: {processingTime} s".format(
1227 msg.debug(
"{notifier}: status: {status}".format(
1239 self.
status =
"terminating"
1240 msg.debug(
"{notifier}: status: {status}".format(
1244 msg.debug(
"{notifier}: terminating pool of {numberOfProcesses} {units}".format(
1249 unitSingular =
"process",
1250 unitPlural =
"processes"
1253 self.
pool.terminate()
1256 msg.debug(
"{notifier}: status: {status}".format(
1275 def fit(self, x, y, model='linear'):
1277 self.
_fit =
Fit(x=x, y=y, model=model)
1278 except Exception
as e:
1279 msg.warning(
'fit failed! {0}'.format(e))
1290 msg.warning(
'Fit has not been defined')
1303 def getFittedData(self, filename, x_name='Time', y_name='pss', precision=2, tails=False, minPoints=5):
1306 table = _memFileToTable.getTable(filename, header=
None, separator=
"\t")
1315 tail = int(len(x)/5)
1316 msg.info(
'removing tails from the memory monitor data; 20% from each side')
1322 if len(x)==len(y)
and len(x) > minPoints:
1323 msg.info(
'fitting {0} vs {1}'.format(y_name, x_name))
1325 fit = self.
fit(x, y)
1326 _slope = self.
slope()
1327 except Exception
as e:
1328 msg.warning(
'failed to fit data, x={0}, y={1}: {2}'.format(x, y, e))
1331 slope = round(fit.slope(), precision)
1332 chi2 = round(fit.chi2(), precision)
1333 fitResult = {
"slope": slope,
"chi2": chi2}
1336 msg.info(
'slope of the fitted line: {0} {1} (using {2} data points, chi2={3})'.format(HRslope, unit, len(x), chi2))
1338 msg.warning(
'wrong length of table data, x={0}, y={1} (must be same and length>={2})'.format(x, y, minPoints))
1347 headerUpperVersion = {
'pss':
'PSS',
'swap':
'Swap',
'rss':
'RSS',
'vmem':
'VMEM'}
1348 x = table.get(x_name, [])
1349 if '+' not in y_name:
1350 y = table.get(y_name, [])
1352 y = table.get(headerUpperVersion[y_name], [])
1355 y1_name = y_name.split(
'+')[0]
1356 y2_name = y_name.split(
'+')[1]
1357 y1_value = table.get(y1_name, [])
1358 y2_value = table.get(y2_name, [])
1359 if len(y1_value)==0
or len(y2_value)==0:
1360 y1_value = table.get(headerUpperVersion[y1_name], [])
1361 y2_value = table.get(headerUpperVersion[y2_name], [])
1362 except Exception
as e:
1363 msg.warning(
'exception caught: {0}'.format(e))
1368 y = [x0 + y0
for x0, y0
in zip(y1_value, y2_value)]
1378 power_labels = {1:
'K', 2:
'M', 3:
'G', 4:
'T'}
1382 return round(size, 2), power_labels[n]+
'B/s'
1400 self.
_model = kwargs.get(
'model',
'linear')
1401 self.
_x = kwargs.get(
'x',
None)
1402 self.
_y = kwargs.get(
'y',
None)
1405 if not self.
_x or not self.
_y:
1406 msg.warning(
'input data not defined')
1408 if len(self.
_x) != len(self.
_y):
1409 msg.warning(
'input data (lists) have different lengths')
1412 if self.
_model ==
'linear':
1413 self.
_ss = self.
_math.sum_square_dev(self.
_x)
1422 msg.warning(
"\'{0}\' model is not implemented".format(self.
_model))
1434 y_observed = self.
_y
1437 y_expected.append(self.
value(x))
1438 if y_observed
and y_observed != []
and y_expected
and y_expected != []:
1477 msg.warning(
'mean requires at least one data point')
1484 return sum((x - c) ** 2
for x
in data)
1491 return sum((_x - c1) * (_y - c2)
for _x, _y
in zip(x, y))
1494 def chi2(self, observed, expected):
1497 return sum((_o - _e) ** 2 / _e
for _o, _e
in zip(observed, expected))
1513 def getTable(self, filename, header=None, separator="\t"):
1517 f = open(filename,
'r')
1518 except Exception
as e:
1519 msg.warning(
"failed to open file: {0}, {1}".format(filename, e))
1523 fields = line.split(separator)
1524 fields = [
''.join(filter(str.isprintable, field))
for field
in fields]
1532 for field
in fields:
1536 tabledict[key].append(float(field))
1557 if key.endswith(
'\n'):
1563 keys = header.split(separator)
1567 if key.endswith(
'\n'):
1572 return tabledict, keylist
1589 defaultOptions = True,
1590 extraOptionsList = None,
1591 AthenaSerialisedConfigurationFile = "athenaConf.pkl",
1592 returnFormat = "string"
1598 suppressionFilesAndCorrespondingPathEnvironmentVariables = {
1599 "etc/valgrind-root.supp":
"ROOTSYS",
1600 "Gaudi.supp":
"DATAPATH",
1601 "oracleDB.supp":
"DATAPATH",
1602 "valgrindRTT.supp":
"DATAPATH",
1603 "root.supp":
"DATAPATH"
1605 optionsList = [
"valgrind"]
1608 optionsList.append(
"--num-callers=30")
1609 optionsList.append(
"--tool=memcheck")
1610 optionsList.append(
"--leak-check=full")
1611 optionsList.append(
"--smc-check=all")
1613 if extraOptionsList:
1614 for option
in extraOptionsList:
1615 optionsList.append(option)
1617 for suppressionFile, pathEnvironmentVariable
in suppressionFilesAndCorrespondingPathEnvironmentVariables.items():
1618 suppFile =
findFile(os.environ[pathEnvironmentVariable], suppressionFile)
1620 optionsList.append(
"--suppressions=" + suppFile)
1622 msg.warning(
"Bad path to suppression file: {sfile}, {path} not defined".format(
1623 sfile = suppressionFile, path = pathEnvironmentVariable)
1625 optionsList.append(
"$(which python)")
1626 optionsList.append(
"$(which athena.py)")
1627 optionsList.append(AthenaSerialisedConfigurationFile)
1629 if returnFormat
is None or returnFormat ==
"string":
1630 return(
" ".join(optionsList))
1631 elif returnFormat ==
"list":
1635 "error: invalid Valgrind command format request (requested " +
1636 "format: {format}; valid formats: string, list)".format(
1637 format = returnFormat
1655 defaultOptions = True,
1656 extraOptionsList = None,
1657 AthenaCommand = ["athena.py", "athenaConf.pkl"],
1658 returnFormat = "string"
1665 optionsList = [
"setsid",
"vtune"]
1668 optionsList.append(
"-run-pass-thru=--no-altstack")
1669 optionsList.append(
"-mrte-mode=native")
1671 isCollectSpecified=
False
1672 if extraOptionsList:
1673 for option
in extraOptionsList:
1674 optionsList.append(option)
1675 if option.startswith(
"-collect="):
1676 isCollectSpecified=
True
1677 if not isCollectSpecified:
1678 optionsList.append(
"-collect=hotspots")
1679 optionsList.append(
"--")
1680 optionsList.extend(AthenaCommand)
1682 if returnFormat
is None or returnFormat ==
"string":
1683 return(
" ".join(optionsList))
1684 elif returnFormat ==
"list":
1688 "error: invalid VTune command format request (requested " +
1689 "format: {format}; valid formats: string, list)".format(
1690 format = returnFormat
1698 cpuTime =
reduce(
lambda x1, x2: x1+x2, list(
map(
lambda x1, x2: x2-x1, start[2:4], stop[2:4])))
1706 wallTime = stop[4] - start[4]
1712 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1714 s.bind((host, port))
1715 except socket.error
as e:
1717 print(
"Port %s is already in use" %port)
1734 regExp = re.compile(
r'ISF_SimEventFi[lter|...]+\s.*INFO.*accepted\s*(?P<events>[0-9]*)\s*out of\s*(?P<total>[0-9]*).*')
1737 except IOError
as e:
1738 msg.warning(
'Failed to open transform logfile {0}: {1:s}'.format(log, e))
1743 for line, lineCounter
in myGen:
1744 m = regExp.match(line)
1746 passed_events += int(m.group(
'events'))
1747 total_events += int(m.group(
'total'))
1748 resimevents = passed_events
1750 if resimevents
is not None:
1751 msg.info(
"Summary of events passed the ISF_SimEventFilter: {0} events of total {1}".format(passed_events, total_events) )
1753 msg.warning(
"Returning null value for the resimevents. No line matched with the regExp for extracting events passed the ISF_SimEventFilter")
static void reduce(HepMC::GenEvent *ge, HepMC::GenParticle *gp)
Remove an unwanted particle from the event, collapsing the graph structure consistently.
void print(char *figname, TCanvas *c1)
JobGroup: a set of Job objects and pieces of information relevant to a given set of Job objects.
__init__(self, jobs=None, name=None, timeout=None)
initialisation method
printout(self)
print in a human-readable way the items of the object self
__str__(self)
return an object self description string @ detail This method returns an object description string co...
timeoutStatus(self)
return Boolean JobGroup timeout status
Job: a set of pieces of information relevant to a given work function.
__init__(self, workFunction=None, workFunctionKeywordArguments={}, workFunctionTimeout=None, name=None)
initialisation method
printout(self)
print in a human-readable way the items of the object self
workFunctionKeywordArguments
__str__(self)
return an object self description string
ParallelJobProcessor: a multiple-process processor of Job objects.
__init__(self, jobSubmission=None, numberOfProcesses=multiprocessing.cpu_count(), startMethod="fork")
initialisation method that accepts submissions and starts pool
submit(self, jobSubmission=None)
submit a Job object or a JobGroup object for processing
statusReport(self)
return a status report string
printout(self)
print in a human-readable way the items of the object self
__str__(self)
return an object self-description string
_terminate(self)
terminate parallel job processor
_abort(self)
abort parallel job processor
list listOfNamesOfRemainingJobs
getResults(self)
get results of JobGroup object submission
extractFromTable(self, table, x_name, y_name)
getFittedData(self, filename, x_name='Time', y_name='pss', precision=2, tails=False, minPoints=5)
fit(self, x, y, model='linear')
Fitting function For a linear model: y(x) = slope * x + intersect.
sum_square_dev(self, data)
chi2(self, observed, expected)
Extract a table of data from a txt file.
getTable(self, filename, header=None, separator="\t")
_defineTableDictKeys(self, header, fields, separator)
Define the keys for the tabledict dictionary.
void mean(std::vector< double > &bins, std::vector< double > &values, const std::vector< std::string > &files, const std::string &histname, const std::string &tplotname, const std::string &label="")
std::string replace(std::string s, const std::string &s2, const std::string &s3)
std::vector< std::string > split(const std::string &s, const std::string &t=":")
releaseIsOlderThan(major, minor=None)
Test (to the best of our knowledge) if the current release is older than a major, minor version numbe...
isodate()
Return isoformated 'now' string.
ValgrindCommand(defaultOptions=True, extraOptionsList=None, AthenaSerialisedConfigurationFile="athenaConf.pkl", returnFormat="string")
shQuoteStrings(strArray=sys.argv)
Quote a string array so that it can be echoed back on the command line in a cut 'n' paste safe way.
listChildren(psTree=None, parent=os.getpid(), listOrphans=False)
Find all the children of a particular PID (calls itself recursively to descend into each leaf)
unpackDBRelease(tarball, dbversion=None)
Ensure that the DBRelease tarball has been unpacked.
pickledDump(argdict)
Dump a list of arguments to the pickle file given in the 'dumpPickle' argument.
reportEventsPassedSimFilter(log)
summarize events passed the ISF_SimEventFilter @detail this function sums up all events passed the IS...
cliToKey(option)
Convert a command line option to the dictionary key that will be used by argparse.
asetupReport()
Return a string with a report of the current athena setup.
cmpMetadata(metadata1, metadata2, guidCheck='valid')
Compare metadata for files, but taking into account that GUID can vary.
asetupReleaseIsOlderThan(asetup_string, major, minor=None)
Test (to the best of our knowledge) if the asetup release is older than a major, minor version number...
lineByLine(filename, strip=True, removeTimestamp=True, substepName=None)
Generator to return lines and line count from a file.
JSONDump(argdict)
Dump a list of arguments to the JSON file given in the 'dumpJSON' argument.
getAncestry(listMyOrphans=False)
List all processes and parents and form a dictionary where the parent key lists all child PIDs.
uniqueIdentifier()
return a URL-safe, base 64-encoded pseudorandom UUID
cvmfsDBReleaseCheck(dbrelease)
Validate a DBRelease exists on cvmfs and return the path to the setup script.
VTuneCommand(defaultOptions=True, extraOptionsList=None, AthenaCommand=["athena.py", "athenaConf.pkl"], returnFormat="string")
return VTune command @detail This function returns a VTune command for use with Athena.
calcWallTime(start, stop)
forceToAlphaNum(string)
Strip a string down to alpha-numeric characters only.
printHR(the_object)
print in a human-readable way the items of a given object
call(args, bufsize=0, executable=None, stdin=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0, message="", logger=msg, loglevel=None, timeout=None, retry=2, timefactor=1.5, sleeptime=10)
initialise_processes()
initisation procedure for processes of process pool
findFile(pathvar, fname)
Find a named file along a colon separated PATH type variable.
setupDBRelease(setup)
Run a DBRelease setup.
convertToStr(in_string)
Recursively convert unicode to str, useful when we have just loaded something from json (TODO: make t...
unpackTarFile(filename, directory=".")
Unpack a given tarfile.
prettyXML(element, indent=' ', poolFileCatalogFormat=False)
XML pretty print an ElementTree.ELement object.
units(quantity=None, unitSingular="unit", unitPlural="units")
return either singular or plural units as appropriate for a given quantity
infanticide(childPIDs=None, sleepTime=3, message=True, listOrphans=False)
Kill all PIDs.