13 import os.path
as path
19 msg = logging.getLogger(__name__)
22 import PyJobTransforms.trfExceptions
as trfExceptions
26 from PyJobTransforms.trfArgs
import addStandardTrfArgs, addFileValidationArguments, addValidationArguments
29 from PyJobTransforms.trfExeStepTools
import executorStepSuffix, getTotalExecutorSteps
47 def __init__(self, standardSignalHandlers = True, standardTrfArgs = True, standardValidationArgs=True,
48 trfName = None, executor = None, exeArgs = None, description = ''):
49 '''Transform class initialiser'''
50 msg.debug(
'Welcome to ATLAS job transforms')
65 self.
_name = trfName
or path.basename(sys.argv[0]).rsplit(
'.py', 1)[0]
70 self.
parser = trfArgParser(description=
'Transform {0}. {1}'.
format(self.
name, description),
71 argument_default=argparse.SUPPRESS,
72 fromfile_prefix_chars=
'@')
77 if standardValidationArgs:
94 if executor
is not None:
102 self.
_report = trfJobReport(parentTrf = self)
108 if standardSignalHandlers:
110 msg.debug(
'Standard signal handlers established')
120 msg.warning(
'Transform exit code getter: _exitCode is unset, returning "TRF_UNKNOWN"')
121 return trfExit.nameToCode(
'TRF_UNKNOWN')
128 msg.warning(
'Transform exit message getter: _exitMsg is unset, returning empty string')
151 transformSetupCpuTime =
None
155 return transformSetupCpuTime
159 transformSetupWallTime =
None
163 return transformSetupWallTime
167 inFileValidationCpuTime =
None
171 return inFileValidationCpuTime
175 inFileValidationWallTime =
None
179 return inFileValidationWallTime
183 outFileValidationCpuTime =
None
187 return outFileValidationCpuTime
191 outFileValidationWallTime =
None
195 return outFileValidationWallTime
217 if executor.conf.firstExecutor:
218 nEvts = executor.eventCount
223 if isinstance(executors, transformExecutor):
224 executors = [executors,]
225 elif not isinstance(executors, (list, tuple, set)):
227 'Transform was initialised with an executor which was not a simple executor or an executor set')
233 for executor
in executors:
237 'Transform has been initialised with two executors with the same name ({0})'
238 ' - executor names must be unique'.
format(executor.name))
253 inputFiles = outputFiles =
False
255 if k.startswith(
'input')
and isinstance(v, argFile):
257 elif k.startswith(
'output')
and isinstance(v, argFile):
259 msg.debug(
"CLI Input files: {0}; Output files {1}".
format(inputFiles, outputFiles))
267 msg.debug(
'Given AMI tag configuration {0}'.
format(self.
_argdict[
'AMIConfig']))
271 for k, v
in dict(tag.trfs[0]).
items():
274 if inputFiles
and k.startswith(
'input'):
275 msg.debug(
'Suppressing argument {0} from AMI'
276 ' because input files have been specified on the command line'.
format(k))
278 if outputFiles
and k.startswith(
'output'):
279 msg.debug(
'Suppressing argument {0} from AMI'
280 ' because output files have been specified on the command line'.
format(k))
283 extraParameters.update(updateDict)
289 msg.debug(
'Given JSON encoded arguments in {0}'.
format(self.
_argdict[
'argJSON']))
291 jsonParams = json.load(argfile)
292 msg.debug(
'Read: {0}'.
format(jsonParams))
295 except Exception
as e:
301 updateDict[
'athenaMPMergeTargetSize'] =
'*:0'
302 updateDict[
'checkEventCount'] =
False
303 updateDict[
'outputFileValidation'] =
False
304 extraParameters.update(updateDict)
308 argsList = [ i.split(
"=", 1)[0].lstrip(
'-')
for i
in args
if i.startswith(
'-')]
309 for k,v
in extraParameters.items():
310 msg.debug(
'Found this extra argument: {0} with value: {1} ({2})'.
format(k, v,
type(v)))
311 if k
not in self.
parser._argClass
and k
not in self.
parser._argAlias:
314 if k
in self.
parser._argAlias:
315 msg.debug(
'Resolving alias from {0} to {1}'.
format(k, self.
parser._argAlias[k]))
316 k = self.
parser._argAlias[k]
319 msg.debug(
'Ignored {0}={1} as extra parameter because this argument was given on the command line.'.
format(k, v))
322 if '__call__' in dir(self.
parser._argClass[k]):
331 if isinstance(v, argument):
333 elif isinstance(v, list):
335 if isinstance(it, argument):
340 msg.info(
'Now dumping pickled version of command line to {0}'.
format(self.
_argdict[
'dumpPickle']))
346 msg.info(
'Now dumping JSON version of command line to {0}'.
format(self.
_argdict[
'dumpJSON']))
351 msg.critical(
'Argument parsing failure: {0!s}'.
format(e))
359 msg.critical(
'AMI failure: {0!s}'.
format(e))
372 if self.
_argdict[
'loglevel']
in stdLogLevels:
373 msg.info(
"Loglevel option found - setting root logger level to %s",
374 logging.getLevelName(stdLogLevels[self.
_argdict[
'loglevel']]))
377 msg.warning(
'Unrecognised loglevel ({0}) given - ignored'.
format(self.
_argdict[
'loglevel']))
385 msg.debug(
'Entering transform execution phase')
394 msg.info(
'Resolving execution graph')
399 print(
"Executor Step: {0} (alias {1})".
format(exe.name, exe.substep))
400 if msg.level <= logging.DEBUG:
401 print(
" {0} -> {1}".
format(exe.inData, exe.outData))
409 msg.info(
'Starting to trace execution path')
417 print(
'Executor path is:')
425 for dataType
in [ data
for data
in self.
_executorGraph.data
if 'NULL' not in data ]:
427 msg.debug(
'Data type {0} maps to existing argument {1}'.
format(dataType, self.
_dataDictionary[dataType]))
429 fileName =
'tmp.' + dataType
431 for (prefix, suffix)
in ((
'tmp',
''), (
'output',
'File'), (
'input',
'File')):
432 stdArgName = prefix + dataType + suffix
433 if stdArgName
in self.
parser._argClass:
434 msg.debug(
'Matched data type {0} to argument {1}'.
format(dataType, stdArgName))
439 if 'HIST' in fileName:
440 self.
_dataDictionary[dataType] = argHISTFile(fileName, io=
'temporary', type=dataType.lower())
443 self.
_dataDictionary[dataType] = argFile(fileName, io=
'temporary', type=dataType.lower())
444 msg.debug(
'Did not find any argument matching data type {0} - setting to plain argFile: {1}'.
format(dataType, self.
_dataDictionary[dataType]))
453 msg.error(
"MPI mode is not supported for jobs with more than one execution step!")
454 msg.error(f
"We have {len(self._executorPath)}: {self._executorPath}")
459 executor.conf.setFromTransform(self)
464 msg.debug(
'Now preparing to execute {0}'.
format(executionStep))
466 executor.preExecute(input = executionStep[
'input'], output = executionStep[
'output'])
469 executor.postExecute()
473 new_data_dict = {**self.
_dataDictionary, **trfMPITools.mpiConfig[
"outputs"]}
475 executor.conf._dataDictionary = new_data_dict
481 msg.debug(
'Transform executor succeeded')
486 msg.warning(
'Transform executor signaled NEEDCHECK condition: {0}'.
format(e.errMsg))
492 msg.critical(
'Transform executor raised %s: %s' % (e.__class__.__name__, e.errMsg))
516 m = re.match(
r'(input|output|tmp)([A-Za-z0-9_]+?)(File)?$', key)
519 if isinstance(value, argFile):
520 if m.group(1) ==
'input':
525 elif isinstance(value, list)
and value
and isinstance(value[0], argFile):
526 if m.group(1) ==
'input':
543 msg.debug(
'Now applying steering to graph: {0}'.
format(self.
_argdict[
'steering'].value))
552 if 'splitConfig' not in self.
_argdict:
557 baseStepName = executionStep[
'name']
558 if baseStepName
in split:
566 msg.info(
'Splitting {0} into {1} substeps'.
format(executionStep, splitting))
569 for i
in range(splitting):
570 name = baseStepName + executorStepSuffix +
str(i)
571 step = copy.deepcopy(baseStep)
574 executor = copy.deepcopy(baseExecutor)
576 executor.conf.executorStep = i
577 executor.conf.totalExecutorSteps = splitting
592 'Execution path finding resulted in no substeps being executed'
593 '(Did you correctly specify input data for this transform?)')
603 steeringDict = self.
_argdict[
'steering'].value
604 for substep, steeringValues
in steeringDict.items():
607 if executor.name == substep
or executor.substep == substep:
609 msg.debug(
'Updating {0} with {1}'.
format(executor.name, steeringValues))
611 for steeringValue
in steeringValues:
612 if steeringValue[0] ==
'in':
613 startSet = executor.inData
615 startSet = executor.outData
616 origLen = len(startSet)
617 msg.debug(
'Data values to be modified are: {0}'.
format(startSet))
618 if steeringValue[1] ==
'+':
619 startSet.add(steeringValue[2])
620 if len(startSet) != origLen + 1:
622 'Attempting to add data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype already there?'.
format(steeringValue[2], executor.name, steeringValue[1], startSet))
624 startSet.discard(steeringValue[2])
625 if len(startSet) != origLen - 1:
627 'Attempting to remove data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype even present?'.
format(steeringValue[2], executor.name, steeringValue[1], startSet))
628 msg.debug(
'Updated data values to: {0}'.
format(startSet))
631 'This transform has no executor/substep {0}'.
format(substep))
639 if not hasattr(self,
'_executorPath')
or len(self.
_executorPath) == 0:
658 def generateReport(self, reportType=None, fast=False, fileReport = defaultFileReport):
659 msg.debug(
'Transform report generator')
660 if 'mpi' in self.
argdict and not trfMPITools.mpiShouldValidate():
661 msg.debug(
"Not in rank 0 -- not generating reports")
665 if reportType
is not None:
666 msg.info(
'Transform requested report types {0} overridden by command line to {1}'.
format(reportType, self.
_argdict[
'reportType'].value))
667 reportType = self.
_argdict[
'reportType'].value
669 if reportType
is None:
670 reportType = [
'json', ]
673 if 'TZHOME' in os.environ:
674 reportType.append(
'gpickle')
677 reportType.append(
'text')
678 msg.debug(
'Detected Non-Interactive environment. Enabled text report')
681 baseName = classicName = self.
_argdict[
'reportName'].value
683 baseName =
'jobReport'
684 classicName =
'metadata'
688 if reportType
is None or 'text' in reportType:
689 envName = baseName
if 'reportName' in self.
_argdict else 'env'
690 self.
_report.writeTxtReport(filename=
'{0}.txt'.
format(envName), fast=fast, fileReport=fileReport)
692 if reportType
is None or 'json' in reportType:
693 self.
_report.writeJSONReport(filename=
'{0}.json'.
format(baseName), fast=fast, fileReport=fileReport)
695 if reportType
is None or 'classic' in reportType:
696 self.
_report.writeClassicXMLReport(filename=
'{0}.xml'.
format(classicName), fast=fast)
698 if reportType
is None or 'gpickle' in reportType:
699 self.
_report.writeGPickleReport(filename=
'{0}.gpickle'.
format(baseName), fast=fast)
701 if reportType
is None or 'pilotPickle' in reportType:
702 self.
_report.writePilotPickleReport(filename=
'{0}Extract.pickle'.
format(baseName), fast=fast, fileReport=fileReport)
705 msg.error(
'Received timeout when writing report ({0})'.
format(reportException))
706 msg.error(
'Report writing is aborted - sorry. Transform will exit with TRF_METADATA_CALL_FAIL status.')
707 if (
'orphanKiller' in self.
_argdict):
711 sys.exit(trfExit.nameToCode(
'TRF_METADATA_CALL_FAIL'))
715 msg.critical(
'Attempt to write job report failed with exception {0!s}: {1!s}'.
format(reportException.__class__.__name__, reportException))
716 msg.critical(
'Stack trace now follows:\n{0}'.
format(traceback.format_exc()))
717 msg.critical(
'Job reports are likely to be missing or incomplete - sorry')
718 msg.critical(
'Please report this as a transforms bug!')
719 msg.critical(
'Before calling the report generator the transform status was: {0}; exit code {1}'.
format(self.
_exitMsg, self.
_exitCode))
720 msg.critical(
'Now exiting with a transform internal error code')
721 if (
'orphanKiller' in self.
_argdict):
725 sys.exit(trfExit.nameToCode(
'TRF_INTERNAL'))
741 msg.critical(
'Transform received signal {0}'.
format(signum))
742 msg.critical(
'Stack trace now follows:\n{0!s}'.
format(
''.
join(traceback.format_stack(frame))))
749 msg.critical(
'Attempting to write reports with known information...')
751 if (
'orphanKiller' in self.
_argdict):
765 self.validation.update(newValidationOptions)
770 return self.validation
776 if key
in self.validation:
777 return self.validation[key]
786 msg.debug(
'Looking for file arguments matching: io={0}'.
format(io))
788 if isinstance(arg, argFile):
789 msg.debug(
'Argument {0} is argFile type ({1!s})'.
format(argName, arg))
790 if io
is not None and arg.io != io:
792 msg.debug(
'Argument {0} matches criteria'.
format(argName))
802 if ((
'skipFileValidation' in self.
_argdict and self.
_argdict[
'skipFileValidation']
is True)
or
803 (
'skipInputFileValidation' in self.
_argdict and self.
_argdict[
'skipInputFileValidation']
is True)
or
804 (
'fileValidation' in self.
_argdict and self.
_argdict[
'fileValidation'].value
is False)
or
805 (
'inputFileValidation' in self.
_argdict and self.
_argdict[
'inputFileValidation'].value
is False)
807 msg.info(
'Standard input file validation turned off for transform %s.', self.
name)
809 msg.info(
'Validating input files')
810 if 'parallelFileValidation' in self.
_argdict:
811 trfValidation.performStandardFileValidation(dictionary=self.
_dataDictionary, io=
'input', parallelMode=self.
_argdict[
'parallelFileValidation'].value )
813 trfValidation.performStandardFileValidation(dictionary=self.
_dataDictionary, io=
'input')
823 if ((
'skipFileValidation' in self.
_argdict and self.
_argdict[
'skipFileValidation']
is True)
or
824 (
'skipOutputFileValidation' in self.
_argdict and self.
_argdict[
'skipOutputFileValidation']
is True)
or
825 (
'fileValidation' in self.
_argdict and self.
_argdict[
'fileValidation'].value
is False)
or
826 (
'outputFileValidation' in self.
_argdict and self.
_argdict[
'outputFileValidation'].value
is False)
828 msg.info(
'Standard output file validation turned off for transform %s.', self.
name)
829 elif 'mpi' in self.
argdict and not trfMPITools.mpiShouldValidate():
830 msg.info(
"MPI mode and not in rank 0 ∴ not validating partial outputs")
832 msg.info(
'Validating output files')
833 parparallelMode =
False
835 parmultithreadedMode =
True
836 if 'parallelFileValidation' in self.
_argdict:
837 parparallelMode = self.
_argdict[
'parallelFileValidation'].value
838 if 'multithreadedFileValidation' in self.
_argdict:
839 parmultithreadedMode = self.
_argdict[
'multithreadedFileValidation'].value
840 trfValidation.performStandardFileValidation(dictionary=self.
_dataDictionary, io=
'output', parallelMode=parparallelMode, multithreadedMode=parmultithreadedMode)