17 import multiprocessing
20 from datetime
import datetime
21 from subprocess
import Popen, STDOUT, PIPE
22 from xml.dom
import minidom
23 from xml.parsers.expat
import ExpatError
24 from xml.etree
import ElementTree
27 import PyJobTransforms.trfExceptions
as trfExceptions
30 from functools
import reduce
31 msg = logging.getLogger(__name__)
39 msg.debug(
'Finding full path for {fileName} in path {path}'.
format(
43 if fname.startswith(
'/'):
47 pathElements = pathvar.split(
':')
48 for pathElement
in pathElements:
49 if path.exists(path.join(pathElement, fname)):
50 return(path.join(pathElement, fname))
63 psCmd = [
'ps',
'ax',
'-o',
'pid,ppid,pgid,args',
'-m']
66 msg.debug(
'Executing %s', psCmd)
67 p = Popen(psCmd, stdout=PIPE, stderr=PIPE)
68 stdout = p.communicate()[0]
71 msg.error(
'Failed to execute "ps" to get process ancestry: %s',
repr(e))
77 for line
in stdout.decode().
split(
'\n'):
79 (pid, ppid, pgid, cmd) = line.split(
None, 3)
87 childDict[ppid].
append(pid)
89 childDict[ppid] = [pid]
90 if listMyOrphans
and ppid == 1
and pgid == myPgid:
91 msg.info(
"Adding PID {0} to list of my children as it seems to be orphaned: {1}".
format(pid, cmd))
92 if myPid
in childDict:
93 childDict[myPid].
append(pid)
95 childDict[myPid] = [pid]
109 def listChildren(psTree = None, parent = os.getpid(), listOrphans =
False):
110 '''Take a psTree dictionary and list all children'''
114 msg.debug(
"List children of %d (%s)", parent, psTree.get(parent, []))
117 children.extend(psTree[parent])
118 for child
in psTree[parent]:
132 def infanticide(childPIDs = None, sleepTime = 3, message = True, listOrphans = False):
133 if childPIDs
is None:
136 if len(childPIDs) > 0
and message:
137 msg.info(
'Killing these child processes: {0}...'.
format(childPIDs))
139 for pid
in childPIDs:
141 os.kill(pid, signal.SIGTERM)
145 time.sleep(sleepTime)
147 for pid
in childPIDs:
149 os.kill(pid, signal.SIGKILL)
155 def call(args, bufsize=0, executable=None, stdin=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0, message="", logger=msg, loglevel=None, timeout=None, retry=2, timefactor=1.5, sleeptime=10):
158 line=p.stdout.readline()
160 line=
"%s%s" % (message, line.rstrip())
164 logger.log(loglevel, line)
167 line=p.stdout.readline()
169 line=
"%s%s" % (message, line.strip())
173 logger.log(loglevel, line)
174 line=p.stdout.readline()
177 loglevel=logging.DEBUG
179 if timeout
is None or timeout<=0:
180 msg.info(
'Executing %s...', args)
181 starttime = time.time()
182 p=Popen(args=args, bufsize=bufsize, executable=executable, stdin=stdin, stdout=PIPE, stderr=STDOUT, preexec_fn=preexec_fn, close_fds=close_fds, shell=shell, cwd=cwd, env=env, universal_newlines=universal_newlines, startupinfo=startupinfo, creationflags=creationflags)
183 while p.poll()
is None:
186 if timeout
is not None:
187 msg.info(
'Executed call within %d s.', time.time()-starttime)
193 msg.info(
'Try %i out of %i (time limit %ss) to call %s.', n+1, retry+1, timeout, args)
194 starttime = time.time()
195 endtime=starttime+timeout
196 p=Popen(args=args, bufsize=bufsize, executable=executable, stdin=stdin, stdout=PIPE, stderr=STDOUT, preexec_fn=preexec_fn, close_fds=close_fds, shell=shell, cwd=cwd, env=env, universal_newlines=universal_newlines, startupinfo=startupinfo, creationflags=creationflags)
197 while p.poll()
is None and time.time()<endtime:
200 msg.warning(
'Timeout limit of %d s reached. Kill subprocess and its children.', timeout)
205 msg.info(
'Checking if something is left in buffer.')
208 msg.info(
'Going to sleep for %d s.', sleeptime)
209 time.sleep(sleeptime)
212 sleeptime*=timefactor
215 msg.info(
'Executed call within %d s.', time.time()-starttime)
218 msg.warning(
'All %i tries failed!', n)
225 eVars = [
'AtlasBaseDir',
'AtlasProject',
'AtlasVersion',
'AtlasPatch',
'AtlasPatchVersion',
'CMTCONFIG',
'TestArea']
226 if "AtlasProject" in os.environ:
227 CMake_Platform =
"{0}_PLATFORM".
format(os.environ[
"AtlasProject"])
228 if CMake_Platform
in os.environ:
229 eVars.remove(
"CMTCONFIG")
230 eVars.append(CMake_Platform)
232 if eVar
in os.environ:
233 setupMsg +=
'\t%s=%s\n' % (eVar, os.environ[eVar])
235 if 'WorkDir_DIR' in os.environ
and os.access(os.environ[
'WorkDir_DIR'], os.R_OK):
246 setupMsg+=
"No readable patch area found"
248 return setupMsg.rstrip()
262 if 'AtlasVersion' not in os.environ
or 'AtlasBaseDir' not in os.environ:
263 msg.warning(
"Could not find 'AtlasVersion' and 'AtlasBaseDir' in the environment - no release match possible")
267 relRegExp = re.compile(
r'(?P<major>\d+)\.(?P<minor>\d+)\.(?P<other>.*)')
268 relMatch = re.match(relRegExp, os.environ[
'AtlasVersion'])
271 leafDir = path.basename(os.environ[
'AtlasBaseDir'])
272 relMatch = re.match(relRegExp, leafDir)
274 msg.info(
'No identifiable numbered release found from AtlasVersion or AtlasBaseDir - assuming dev/devval/mig')
277 relmajor =
int(relMatch.group(
'major'))
278 relminor =
int(relMatch.group(
'minor'))
279 msg.info(
'Detected release major {0}, minor {1} (.{2}) from environment'.
format(relmajor, relminor, relMatch.group(
'other')))
288 if minor
is None or relminor >= minor:
292 except Exception
as e:
293 msg.warning(
'Exception thrown when attempting to detect athena version ({0}). No release check possible'.
format(e))
309 split_string = asetup_string.split(
',')
311 if 'master' in split_string:
315 reg_exp = re.compile(
r'(?P<major>\d+)\.(?P<minor>\d+)\.(?P<other>.*)')
316 for part
in split_string:
318 match = re.match(reg_exp, part)
320 relmajor =
int(match.group(
'major'))
321 relminor =
int(match.group(
'minor'))
322 msg.info(
'Detected asetup release {0}.{1}(.{2})'.
format(relmajor, relminor, match.group(
'other')))
327 reg_exp = re.compile(
r'(?P<major>\d+)\.(?P<minor>\d+)')
328 for part
in split_string:
330 match = re.match(reg_exp, part)
332 relmajor =
int(match.group(
'major'))
333 relminor =
int(match.group(
'minor'))
334 msg.info(
'Detected asetup release {0}.{1}'.
format(relmajor, relminor))
339 raise RuntimeError(
'asetup version could not be parsed')
348 if minor
is None or relminor >= minor:
352 except Exception
as e:
353 msg.warning(
'Exception thrown when attempting to detect asetup athena version ({0}) from {1}. No release check possible'.
format(e, asetup_string))
362 return [
"'" + qstring.replace(
"'",
"\\'") +
"'" for qstring
in strArray ]
372 def lineByLine(filename, strip=True, removeTimestamp=True, substepName=None):
374 encargs = {
'encoding' :
'utf8'}
375 f =
open(filename,
'r', **encargs)
378 if substepName
and isinstance(substepName, str):
379 line = line.lstrip(substepName)
381 line = line.lstrip(
'0123456789:-, ')
384 yield line, linecounter
397 def prettyXML(element, indent = ' ', poolFileCatalogFormat = False):
400 xmlstring = ElementTree.tostring(element,
'utf-8')
402 metadataDoc = minidom.parseString(xmlstring)
405 msg.warning(
'Error parsing ElementTree string - will try removing hex literals ({0!r})'.
format(xmlstring))
406 xmlstring = xmlstring.replace(
'\x00',
'')
407 metadataDoc = minidom.parseString(xmlstring)
410 if poolFileCatalogFormat
is False:
411 return metadataDoc.toprettyxml(indent=indent, encoding=
'UTF-8')
415 imp = minidom.DOMImplementation()
416 doctype = imp.createDocumentType(qualifiedName=
'POOLFILECATALOG', publicId=
'', systemId=
'InMemory')
417 doc = imp.createDocument(
None,
'POOLFILECATALOG', doctype)
421 refel = doc.documentElement
422 for child
in metadataDoc.childNodes:
423 if child.nodeType==child.ELEMENT_NODE:
424 doc.replaceChild(doc.importNode(child,
True), doc.documentElement)
426 elif child.nodeType!=child.DOCUMENT_TYPE_NODE:
427 doc.insertBefore(doc.importNode(child,
True), refel)
429 return doc.toprettyxml(indent=indent, encoding=
'UTF-8')
435 return datetime.now().
replace(microsecond=0).isoformat()
444 if string
is None or string.isalnum():
450 msg.warning(
"String {0} was stripped to alphanumeric characters only: {1}".
format(string, newstring))
466 allFiles =
set(metadata1) |
set(metadata2)
467 if len(allFiles) > len(metadata1)
or len(allFiles) > len(metadata2):
468 msg.warning(
'In metadata comparison file lists are not equal - fails ({0} != {1}'.
format(metadata1, metadata2))
470 for fname
in allFiles:
471 allKeys =
set(metadata1[fname]) |
set(metadata2[fname])
472 if len(allKeys) > len(metadata1[fname])
or len(allFiles) > len(metadata2[fname]):
473 msg.warning(
'In metadata comparison key lists are not equal - fails')
476 if key ==
'file_guid':
477 if guidCheck ==
'ignore':
479 elif guidCheck ==
'equal':
480 if metadata1[fname][
'file_guid'].
upper() == metadata2[fname][
'file_guid'].
upper():
483 msg.warning(
'In metadata comparison strict GUID comparison failed.')
485 elif guidCheck ==
'valid':
487 uuid.UUID(metadata1[fname][
'file_guid'])
488 uuid.UUID(metadata2[fname][
'file_guid'])
491 msg.warning(
'In metadata comparison found invalid GUID strings.')
493 if metadata1[fname][key] != metadata2[fname][key]:
494 msg.warning(
'In metadata comparison found different key values: {0!s} != {1!s}'.
format(metadata1[fname][key], metadata2[fname][key]))
503 tar = tarfile.open(filename)
504 tar.extractall(path=directory)
506 except Exception
as e:
507 errMsg =
'Error encountered while unpacking {0} to {1}: {2}'.
format(filename, directory, e)
521 if dbversion
is None:
522 dbdMatch = re.match(
r'DBRelease-([\d\.]+)\.tar\.gz', path.basename(tarball))
525 'Could not find a valid version in the DBRelease tarball: {0}'.
format(tarball))
526 dbversion = dbdMatch.group(1)
527 dbsetup = path.abspath(path.join(
"DBRelease", dbversion,
"setup.py"))
528 if os.access(dbsetup, os.R_OK):
529 msg.debug(
'DBRelease {0} is already unpacked, found {1}'.
format(tarball, dbsetup))
530 return False, dbsetup
532 msg.debug(
'Will attempt to unpack DBRelease {0}'.
format(tarball))
534 msg.info(
'DBRelease {0} was unpacked'.
format(tarball))
535 if not os.access(dbsetup, os.R_OK):
537 'DBRelease setup file {0} was not readable, even after unpacking {1}'.
format(dbsetup, tarball))
545 dbdir=path.abspath(path.dirname(setup))
546 msg.debug(
'Will add {0} to sys.path to load DBRelease setup module'.
format(dbdir))
549 sys.path.insert(0, dbdir)
550 from setup
import Setup
554 msg.debug(
'DBRelease setup module was initialised successfully')
555 except ImportError
as e:
556 errMsg =
'Import error while trying to load DB Setup module: {0}'.
format(e)
559 except Exception
as e:
560 errMsg =
'Unexpected error while trying to load DB Setup module: {0}'.
format(e)
571 dbdMatch = re.match(
r'([\d\.]+|current)$', dbrelease)
572 msg.debug(
'Attempting to setup DBRelease {0} from cvmfs'.
format(dbrelease))
574 if 'VO_ATLAS_SW_DIR' in os.environ:
575 msg.debug(
'Found site defined path to ATLAS software: {0}'.
format(os.environ[
'VO_ATLAS_SW_DIR']))
576 dbsetup = path.join(os.environ[
'VO_ATLAS_SW_DIR'],
'database',
'DBRelease', dbrelease,
'setup.py')
577 if os.access(dbsetup, os.R_OK):
579 msg.warning(
'Site defined path to ATLAS software seems invalid (failed to access {0}). Will also try standard cvmfs path.'.
format(dbsetup))
581 msg.debug(
'Using standard CVMFS path to ATLAS software')
583 dbsetup = path.join(
'/cvmfs/atlas.cern.ch/repo/sw/database/DBRelease', dbrelease,
'setup.py')
584 if not os.access(dbsetup, os.R_OK):
586 'CVMFS DBRelease setup file {0} was not readable'.
format(dbsetup))
587 msg.debug(
'Using cvmfs based dbrelease: {0}'.
format(path.dirname(dbsetup)))
590 'Unable to interpret DBRelease "{0}" as either a tarball or a CVMFS release directory'.
format(dbrelease))
599 if 'dumpPickle' not in argdict:
603 theArgumentDictionary = {}
604 for k, v
in argdict.items():
605 if k ==
'dumpPickle':
607 if isinstance(v, argument):
608 theArgumentDictionary[k] = getattr(v,
"dumpvalue", v.value)
610 theArgumentDictionary[k] = v
611 with open(argdict[
'dumpPickle'],
'wb')
as pickleFile:
612 import pickle
as pickle
613 pickle.dump(theArgumentDictionary, pickleFile)
618 if 'dumpJSON' not in argdict:
622 theArgumentDictionary = {}
623 for k, v
in argdict.items():
626 if isinstance(v, argument):
627 theArgumentDictionary[k] = getattr(v,
"dumpvalue", v.value)
629 theArgumentDictionary[k] = v
630 with open(argdict[
'dumpJSON'],
'w')
as JSONFile:
632 json.dump(theArgumentDictionary, JSONFile, sort_keys=
True, indent=2)
637 if isinstance(in_string, dict):
639 elif isinstance(in_string, list):
643 elif in_string.__class__.__name__ ==
'unicode':
644 return in_string.encode(
'utf-8')
645 elif in_string.__class__.__name__ ==
'bytes':
646 return in_string.decode(
'utf-8')
653 return option.lstrip(
'-').
replace(
'-',
'_')
662 if isinstance(the_object, dict):
663 for key, value
in sorted(the_object.items()):
664 print(
u'{key}: {value}'.
format(key = key, value = value))
666 elif isinstance(the_object, list)
or isinstance(the_object, tuple):
667 for element
in the_object:
679 return str(base64.urlsafe_b64encode(uuid.uuid4().bytes).strip(
"="))
693 unitSingular = "unit",
704 isInteractiveEnv =
False
706 if 'PS1' in os.environ
or 'prompt' in os.environ:
707 isInteractiveEnv =
True
708 elif os.isatty(sys.stdout.fileno())
or os.isatty(sys.stdin.fileno()):
709 isInteractiveEnv =
True
711 return isInteractiveEnv
729 workFunctionKeywordArguments = {},
730 workFunctionTimeout = None,
743 exceptionMessage =
"work function not specified"
744 msg.error(
"{notifier}: exception message: {exceptionMessage}".
format(
746 exceptionMessage = exceptionMessage
749 trfExit.nameToCode(
'TRF_INTERNAL'),
762 descriptionString =
""
764 descriptionString +=
str(
"{key}:{value} ".
format(
768 return descriptionString
807 for job
in self.
jobs:
808 self.
timeout += job.workFunctionTimeout
820 descriptionString =
""
822 descriptionString +=
str(
"{key}:{value} ".
format(
826 return descriptionString
861 signal.signal(signal.SIGINT, signal.SIG_IGN)
875 jobSubmission = None,
876 numberOfProcesses = multiprocessing.cpu_count(),
882 msg.debug(
"{notifier}: status: {status}".
format(
888 self.
pool = multiprocessing.Pool(
892 msg.debug(
"{notifier}: pool of {numberOfProcesses} {units} created".
format(
896 unitSingular =
"process", unitPlural =
"processes")
899 msg.debug(
"{notifier}: status: {status}".
format(
909 descriptionString =
""
911 descriptionString +=
str(
"{key}:{value} ".
format(
915 return descriptionString
934 if jobSubmission
is not None:
937 msg.debug(
"{notifier}: status: {status}".
format(
955 msg.debug(
"{notifier}: received job group submission '{name}' of {countOfJobs} {units}".
format(
961 unitSingular =
"job",
966 msg.debug(
"{notifier}: submitting job group submission '{name}' to pool".
format(
973 job.timeStampSubmission = time.time()
974 msg.debug(
"{notifier}: job '{name}' submitted to pool".
format(
980 job.resultGetter = self.
pool.apply_async(
981 func = job.workFunction,
982 kwds = job.workFunctionKeywordArguments
987 msg.debug(
"{notifier}: job group submission complete: {countOfJobs} {units} submitted to pool (timestamp: {timeStampSubmission})".
format(
992 unitSingular =
"job",
997 self.
status =
"processing"
998 msg.debug(
"{notifier}: status: {status}".
format(
1020 msg.debug(
"{notifier}: checking for job {units}".
format(
1024 unitSingular =
"result",
1025 unitPlural =
"results")
1037 msg.error(
"{notifier}: job group '{name}' timed out".
format(
1042 exceptionMessage =
"timeout of a function in list {listOfNamesOfRemainingJobs}".
format(
1045 msg.error(
"{notifier}: exception message: {exceptionMessage}".
format(
1047 exceptionMessage = exceptionMessage
1050 trfExit.nameToCode(
'TRF_EXEC_TIMEOUT'),
1056 if not hasattr(job,
'result'):
1061 if job.resultGetter.ready():
1063 "{notifier}: result ready for job '{name}'".
format(
1068 job.successStatus = job.resultGetter.successful()
1070 "{notifier}: job '{name}' success status: {successStatus}".
format(
1073 successStatus = job.successStatus
1078 if job.successStatus:
1079 job.result = job.resultGetter.get()
1081 "{notifier}: result of job '{name}': {result}".
format(
1089 "{notifier}: {countOfRemainingJobs} {units} remaining".
format(
1094 unitSingular =
"job",
1101 elif not job.successStatus:
1103 "{notifier}: job '{name}' failed".
format(
1109 exceptionMessage =
"failure of function '{name}' with arguments {arguments}".
format(
1110 name = job.workFunction.__name__,
1111 arguments = job.workFunctionKeywordArguments
1113 msg.error(
"{notifier}: exception message: {exceptionMessage}".
format(
1115 exceptionMessage = exceptionMessage
1118 trfExit.nameToCode(
'TRF_EXEC_FAIL'),
1126 msg.debug(
"{notifier}: all {countOfJobs} {units} complete (timestamp: {timeStampComplete})".
format(
1131 unitSingular =
"job",
1137 msg.debug(
"{notifier}: time taken to process all {countOfJobs} {units}: {processingTime}".
format(
1142 unitSingular =
"job",
1158 statusReport =
"\n{notifier}:\n status report:".
format(
1162 statusReport +=
"\n parallel job processor configuration:"
1163 statusReport +=
"\n status: {notifier}".
format(
1166 statusReport +=
"\n number of processes: {notifier}".
format(
1170 statusReport +=
"\n job group submission: '{notifier}'".
format(
1173 statusReport +=
"\n total number of jobs: {notifier}".
format(
1176 statusReport +=
"\n number of incomplete jobs: {notifier}".
format(
1179 statusReport +=
"\n names of incomplete jobs: {notifier}".
format(
1184 statusReport +=
"\n jobs:"
1186 statusReport +=
"\n job '{name}':".
format(
1189 statusReport +=
"\n workFunction: '{name}'".
format(
1190 name = job.workFunction.__name__
1192 statusReport +=
"\n workFunctionKeywordArguments: '{arguments}'".
format(
1193 arguments = job.workFunctionKeywordArguments
1195 statusReport +=
"\n workFunctionTimeout: '{timeout}'".
format(
1196 timeout = job.workFunctionTimeout
1198 if hasattr(job,
'result'):
1199 statusReport +=
"\n result: '{result}'".
format(
1204 statusReport +=
"\n statistics:"
1206 statusReport +=
"\n total processing time: {processingTime} s".
format(
1216 msg.debug(
"{notifier}: status: {status}".
format(
1228 self.
status =
"terminating"
1229 msg.debug(
"{notifier}: status: {status}".
format(
1233 msg.debug(
"{notifier}: terminating pool of {numberOfProcesses} {units}".
format(
1238 unitSingular =
"process",
1239 unitPlural =
"processes"
1242 self.
pool.terminate()
1245 msg.debug(
"{notifier}: status: {status}".
format(
1264 def fit(self, x, y, model='linear'):
1266 self.
_fit =
Fit(x=x, y=y, model=model)
1267 except Exception
as e:
1268 msg.warning(
'fit failed! {0}'.
format(e))
1279 msg.warning(
'Fit has not been defined')
1292 def getFittedData(self, filename, x_name='Time', y_name='pss', precision=2, tails=False, minPoints=5):
1295 table = _memFileToTable.getTable(filename, header=
None, separator=
"\t")
1304 tail =
int(len(x)/5)
1305 msg.info(
'removing tails from the memory monitor data; 20% from each side')
1311 if len(x)==len(y)
and len(x) > minPoints:
1312 msg.info(
'fitting {0} vs {1}'.
format(y_name, x_name))
1314 fit = self.
fit(x, y)
1315 _slope = self.
slope()
1316 except Exception
as e:
1317 msg.warning(
'failed to fit data, x={0}, y={1}: {2}'.
format(x, y, e))
1320 slope =
round(fit.slope(), precision)
1321 chi2 =
round(fit.chi2(), precision)
1322 fitResult = {
"slope": slope,
"chi2": chi2}
1325 msg.info(
'slope of the fitted line: {0} {1} (using {2} data points, chi2={3})'.
format(HRslope, unit, len(x), chi2))
1327 msg.warning(
'wrong length of table data, x={0}, y={1} (must be same and length>={2})'.
format(x, y, minPoints))
1336 headerUpperVersion = {
'pss':
'PSS',
'swap':
'Swap',
'rss':
'RSS',
'vmem':
'VMEM'}
1337 x = table.get(x_name, [])
1338 if '+' not in y_name:
1339 y = table.get(y_name, [])
1341 y = table.get(headerUpperVersion[y_name], [])
1344 y1_name = y_name.split(
'+')[0]
1345 y2_name = y_name.split(
'+')[1]
1346 y1_value = table.get(y1_name, [])
1347 y2_value = table.get(y2_name, [])
1348 if len(y1_value)==0
or len(y2_value)==0:
1349 y1_value = table.get(headerUpperVersion[y1_name], [])
1350 y2_value = table.get(headerUpperVersion[y2_name], [])
1351 except Exception
as e:
1352 msg.warning(
'exception caught: {0}'.
format(e))
1357 y = [x0 + y0
for x0, y0
in zip(y1_value, y2_value)]
1367 power_labels = {1:
'K', 2:
'M', 3:
'G', 4:
'T'}
1371 return round(size, 2), power_labels[n]+
'B/s'
1389 self.
_model = kwargs.get(
'model',
'linear')
1390 self.
_x = kwargs.get(
'x',
None)
1391 self.
_y = kwargs.get(
'y',
None)
1394 if not self.
_x or not self.
_y:
1395 msg.warning(
'input data not defined')
1397 if len(self.
_x) != len(self.
_y):
1398 msg.warning(
'input data (lists) have different lengths')
1401 if self.
_model ==
'linear':
1402 self.
_ss = self.
_math.sum_square_dev(self.
_x)
1411 msg.warning(
"\'{0}\' model is not implemented".
format(self.
_model))
1423 y_observed = self.
_y
1426 y_expected.append(self.
value(x))
1427 if y_observed
and y_observed != []
and y_expected
and y_expected != []:
1466 msg.warning(
'mean requires at least one data point')
1473 return sum((x - c) ** 2
for x
in data)
1480 return sum((_x - c1) * (_y - c2)
for _x, _y
in zip(x, y))
1483 def chi2(self, observed, expected):
1486 return sum((_o - _e) ** 2 / _e
for _o, _e
in zip(observed, expected))
1502 def getTable(self, filename, header=None, separator="\t"):
1506 f =
open(filename,
'r')
1507 except Exception
as e:
1508 msg.warning(
"failed to open file: {0}, {1}".
format(filename, e))
1512 fields = line.split(separator)
1520 for field
in fields:
1545 if key.endswith(
'\n'):
1551 keys = header.split(separator)
1555 if key.endswith(
'\n'):
1560 return tabledict, keylist
1577 defaultOptions = True,
1578 extraOptionsList = None,
1579 AthenaSerialisedConfigurationFile = "athenaConf.pkl",
1580 returnFormat = "string"
1586 suppressionFilesAndCorrespondingPathEnvironmentVariables = {
1587 "etc/valgrind-root.supp":
"ROOTSYS",
1588 "Gaudi.supp":
"DATAPATH",
1589 "oracleDB.supp":
"DATAPATH",
1590 "valgrindRTT.supp":
"DATAPATH",
1591 "root.supp":
"DATAPATH"
1593 optionsList = [
"valgrind"]
1596 optionsList.append(
"--num-callers=30")
1597 optionsList.append(
"--tool=memcheck")
1598 optionsList.append(
"--leak-check=full")
1599 optionsList.append(
"--smc-check=all")
1601 if extraOptionsList:
1602 for option
in extraOptionsList:
1603 optionsList.append(option)
1605 for suppressionFile, pathEnvironmentVariable
in suppressionFilesAndCorrespondingPathEnvironmentVariables.items():
1606 suppFile =
findFile(os.environ[pathEnvironmentVariable], suppressionFile)
1608 optionsList.append(
"--suppressions=" + suppFile)
1610 msg.warning(
"Bad path to suppression file: {sfile}, {path} not defined".
format(
1611 sfile = suppressionFile, path = pathEnvironmentVariable)
1613 optionsList.append(
"$(which python)")
1614 optionsList.append(
"$(which athena.py)")
1615 optionsList.append(AthenaSerialisedConfigurationFile)
1617 if returnFormat
is None or returnFormat ==
"string":
1618 return(
" ".
join(optionsList))
1619 elif returnFormat ==
"list":
1623 "error: invalid Valgrind command format request (requested " +
1624 "format: {format}; valid formats: string, list)".
format(
1625 format = returnFormat
1643 defaultOptions = True,
1644 extraOptionsList = None,
1645 AthenaSerialisedConfigurationFile = "athenaConf.pkl",
1646 returnFormat = "string"
1652 optionsList = [
"vtune"]
1655 optionsList.append(
"-run-pass-thru=--no-altstack")
1656 optionsList.append(
"-mrte-mode=native")
1658 isCollectSpecified=
False
1659 if extraOptionsList:
1660 for option
in extraOptionsList:
1661 optionsList.append(option)
1662 if option.startswith(
"-collect="):
1663 isCollectSpecified=
True
1664 if not isCollectSpecified:
1665 optionsList.append(
"-collect=hotspots")
1666 optionsList.append(
"-- $(which python)")
1667 optionsList.append(
"$(which athena.py)")
1668 optionsList.append(AthenaSerialisedConfigurationFile)
1670 if returnFormat
is None or returnFormat ==
"string":
1671 return(
" ".
join(optionsList))
1672 elif returnFormat ==
"list":
1676 "error: invalid VTune command format request (requested " +
1677 "format: {format}; valid formats: string, list)".
format(
1678 format = returnFormat
1686 cpuTime =
reduce(
lambda x1, x2: x1+x2,
list(map(
lambda x1, x2: x2-x1, start[2:4], stop[2:4])))
1694 wallTime = stop[4] - start[4]
1700 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1702 s.bind((host, port))
1703 except socket.error
as e:
1705 print(
"Port %s is already in use" %port)
1722 regExp = re.compile(
r'ISF_SimEventFi[lter|...]+\s.*INFO.*accepted\s*(?P<events>[0-9]*)\s*out of\s*(?P<total>[0-9]*).*')
1725 except IOError
as e:
1726 msg.warning(
'Failed to open transform logfile {0}: {1:s}'.
format(log, e))
1731 for line, lineCounter
in myGen:
1732 m = regExp.match(line)
1734 passed_events +=
int(m.group(
'events'))
1735 total_events +=
int(m.group(
'total'))
1736 resimevents = passed_events
1738 if resimevents
is not None:
1739 msg.info(
"Summary of events passed the ISF_SimEventFilter: {0} events of total {1}".
format(passed_events, total_events) )
1741 msg.warning(
"Returning null value for the resimevents. No line matched with the regExp for extracting events passed the ISF_SimEventFilter")