13 import os.path
as path
19 msg = logging.getLogger(__name__)
22 import PyJobTransforms.trfExceptions
as trfExceptions
25 from PyJobTransforms.trfArgs
import addStandardTrfArgs, addFileValidationArguments, addValidationArguments
28 from PyJobTransforms.trfExeStepTools
import executorStepSuffix, getTotalExecutorSteps
46 def __init__(self, standardSignalHandlers = True, standardTrfArgs = True, standardValidationArgs=True,
47 trfName = None, executor = None, exeArgs = None, description = ''):
48 '''Transform class initialiser'''
49 msg.debug(
'Welcome to ATLAS job transforms')
64 self.
_name = trfName
or path.basename(sys.argv[0]).rsplit(
'.py', 1)[0]
69 self.
parser = trfArgParser(description=
'Transform {0}. {1}'.
format(self.
name, description),
70 argument_default=argparse.SUPPRESS,
71 fromfile_prefix_chars=
'@')
76 if standardValidationArgs:
93 if executor
is not None:
101 self.
_report = trfJobReport(parentTrf = self)
107 if standardSignalHandlers:
109 msg.debug(
'Standard signal handlers established')
119 msg.warning(
'Transform exit code getter: _exitCode is unset, returning "TRF_UNKNOWN"')
120 return trfExit.nameToCode(
'TRF_UNKNOWN')
127 msg.warning(
'Transform exit message getter: _exitMsg is unset, returning empty string')
150 transformSetupCpuTime =
None
154 return transformSetupCpuTime
158 transformSetupWallTime =
None
162 return transformSetupWallTime
166 inFileValidationCpuTime =
None
170 return inFileValidationCpuTime
174 inFileValidationWallTime =
None
178 return inFileValidationWallTime
182 outFileValidationCpuTime =
None
186 return outFileValidationCpuTime
190 outFileValidationWallTime =
None
194 return outFileValidationWallTime
216 if executor.conf.firstExecutor:
217 nEvts = executor.eventCount
222 if isinstance(executors, transformExecutor):
223 executors = [executors,]
224 elif not isinstance(executors, (list, tuple, set)):
226 'Transform was initialised with an executor which was not a simple executor or an executor set')
232 for executor
in executors:
236 'Transform has been initialised with two executors with the same name ({0})'
237 ' - executor names must be unique'.
format(executor.name))
252 inputFiles = outputFiles =
False
254 if k.startswith(
'input')
and isinstance(v, argFile):
256 elif k.startswith(
'output')
and isinstance(v, argFile):
258 msg.debug(
"CLI Input files: {0}; Output files {1}".
format(inputFiles, outputFiles))
266 msg.debug(
'Given AMI tag configuration {0}'.
format(self.
_argdict[
'AMIConfig']))
270 for k, v
in dict(tag.trfs[0]).
items():
273 if inputFiles
and k.startswith(
'input'):
274 msg.debug(
'Suppressing argument {0} from AMI'
275 ' because input files have been specified on the command line'.
format(k))
277 if outputFiles
and k.startswith(
'output'):
278 msg.debug(
'Suppressing argument {0} from AMI'
279 ' because output files have been specified on the command line'.
format(k))
282 extraParameters.update(updateDict)
288 msg.debug(
'Given JSON encoded arguments in {0}'.
format(self.
_argdict[
'argJSON']))
290 jsonParams = json.load(argfile)
291 msg.debug(
'Read: {0}'.
format(jsonParams))
294 except Exception
as e:
300 updateDict[
'athenaMPMergeTargetSize'] =
'*:0'
301 updateDict[
'checkEventCount'] =
False
302 updateDict[
'outputFileValidation'] =
False
303 extraParameters.update(updateDict)
307 argsList = [ i.split(
"=", 1)[0].lstrip(
'-')
for i
in args
if i.startswith(
'-')]
308 for k,v
in extraParameters.items():
309 msg.debug(
'Found this extra argument: {0} with value: {1} ({2})'.
format(k, v,
type(v)))
310 if k
not in self.
parser._argClass
and k
not in self.
parser._argAlias:
313 if k
in self.
parser._argAlias:
314 msg.debug(
'Resolving alias from {0} to {1}'.
format(k, self.
parser._argAlias[k]))
315 k = self.
parser._argAlias[k]
318 msg.debug(
'Ignored {0}={1} as extra parameter because this argument was given on the command line.'.
format(k, v))
321 if '__call__' in dir(self.
parser._argClass[k]):
330 if isinstance(v, argument):
332 elif isinstance(v, list):
334 if isinstance(it, argument):
339 msg.info(
'Now dumping pickled version of command line to {0}'.
format(self.
_argdict[
'dumpPickle']))
345 msg.info(
'Now dumping JSON version of command line to {0}'.
format(self.
_argdict[
'dumpJSON']))
350 msg.critical(
'Argument parsing failure: {0!s}'.
format(e))
358 msg.critical(
'AMI failure: {0!s}'.
format(e))
371 if self.
_argdict[
'loglevel']
in stdLogLevels:
372 msg.info(
"Loglevel option found - setting root logger level to %s",
373 logging.getLevelName(stdLogLevels[self.
_argdict[
'loglevel']]))
376 msg.warning(
'Unrecognised loglevel ({0}) given - ignored'.
format(self.
_argdict[
'loglevel']))
384 msg.debug(
'Entering transform execution phase')
393 msg.info(
'Resolving execution graph')
398 print(
"Executor Step: {0} (alias {1})".
format(exe.name, exe.substep))
399 if msg.level <= logging.DEBUG:
400 print(
" {0} -> {1}".
format(exe.inData, exe.outData))
408 msg.info(
'Starting to trace execution path')
416 print(
'Executor path is:')
424 for dataType
in [ data
for data
in self.
_executorGraph.data
if 'NULL' not in data ]:
426 msg.debug(
'Data type {0} maps to existing argument {1}'.
format(dataType, self.
_dataDictionary[dataType]))
428 fileName =
'tmp.' + dataType
430 for (prefix, suffix)
in ((
'tmp',
''), (
'output',
'File'), (
'input',
'File')):
431 stdArgName = prefix + dataType + suffix
432 if stdArgName
in self.
parser._argClass:
433 msg.debug(
'Matched data type {0} to argument {1}'.
format(dataType, stdArgName))
438 if 'HIST' in fileName:
439 self.
_dataDictionary[dataType] = argHISTFile(fileName, io=
'temporary', type=dataType.lower())
442 self.
_dataDictionary[dataType] = argFile(fileName, io=
'temporary', type=dataType.lower())
443 msg.debug(
'Did not find any argument matching data type {0} - setting to plain argFile: {1}'.
format(dataType, self.
_dataDictionary[dataType]))
451 executor.conf.setFromTransform(self)
456 msg.debug(
'Now preparing to execute {0}'.
format(executionStep))
458 executor.preExecute(input = executionStep[
'input'], output = executionStep[
'output'])
461 executor.postExecute()
468 msg.debug(
'Transform executor succeeded')
473 msg.warning(
'Transform executor signaled NEEDCHECK condition: {0}'.
format(e.errMsg))
479 msg.critical(
'Transform executor raised %s: %s' % (e.__class__.__name__, e.errMsg))
503 m = re.match(
r'(input|output|tmp)([A-Za-z0-9_]+?)(File)?$', key)
506 if isinstance(value, argFile):
507 if m.group(1) ==
'input':
512 elif isinstance(value, list)
and value
and isinstance(value[0], argFile):
513 if m.group(1) ==
'input':
530 msg.debug(
'Now applying steering to graph: {0}'.
format(self.
_argdict[
'steering'].value))
539 if 'splitConfig' not in self.
_argdict:
544 baseStepName = executionStep[
'name']
545 if baseStepName
in split:
553 msg.info(
'Splitting {0} into {1} substeps'.
format(executionStep, splitting))
556 for i
in range(splitting):
557 name = baseStepName + executorStepSuffix +
str(i)
558 step = copy.deepcopy(baseStep)
561 executor = copy.deepcopy(baseExecutor)
563 executor.conf.executorStep = i
564 executor.conf.totalExecutorSteps = splitting
579 'Execution path finding resulted in no substeps being executed'
580 '(Did you correctly specify input data for this transform?)')
590 steeringDict = self.
_argdict[
'steering'].value
591 for substep, steeringValues
in steeringDict.items():
594 if executor.name == substep
or executor.substep == substep:
596 msg.debug(
'Updating {0} with {1}'.
format(executor.name, steeringValues))
598 for steeringValue
in steeringValues:
599 if steeringValue[0] ==
'in':
600 startSet = executor.inData
602 startSet = executor.outData
603 origLen = len(startSet)
604 msg.debug(
'Data values to be modified are: {0}'.
format(startSet))
605 if steeringValue[1] ==
'+':
606 startSet.add(steeringValue[2])
607 if len(startSet) != origLen + 1:
609 'Attempting to add data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype already there?'.
format(steeringValue[2], executor.name, steeringValue[1], startSet))
611 startSet.discard(steeringValue[2])
612 if len(startSet) != origLen - 1:
614 'Attempting to remove data type {0} from {1} {2} fails (original set of data: {3}). Was this datatype even present?'.
format(steeringValue[2], executor.name, steeringValue[1], startSet))
615 msg.debug(
'Updated data values to: {0}'.
format(startSet))
618 'This transform has no executor/substep {0}'.
format(substep))
626 if not hasattr(self,
'_executorPath')
or len(self.
_executorPath) == 0:
645 def generateReport(self, reportType=None, fast=False, fileReport = defaultFileReport):
646 msg.debug(
'Transform report generator')
649 if reportType
is not None:
650 msg.info(
'Transform requested report types {0} overridden by command line to {1}'.
format(reportType, self.
_argdict[
'reportType'].value))
651 reportType = self.
_argdict[
'reportType'].value
653 if reportType
is None:
654 reportType = [
'json', ]
657 if 'TZHOME' in os.environ:
658 reportType.append(
'gpickle')
661 reportType.append(
'text')
662 msg.debug(
'Detected Non-Interactive environment. Enabled text report')
665 baseName = classicName = self.
_argdict[
'reportName'].value
667 baseName =
'jobReport'
668 classicName =
'metadata'
672 if reportType
is None or 'text' in reportType:
673 envName = baseName
if 'reportName' in self.
_argdict else 'env'
674 self.
_report.writeTxtReport(filename=
'{0}.txt'.
format(envName), fast=fast, fileReport=fileReport)
676 if reportType
is None or 'json' in reportType:
677 self.
_report.writeJSONReport(filename=
'{0}.json'.
format(baseName), fast=fast, fileReport=fileReport)
679 if reportType
is None or 'classic' in reportType:
680 self.
_report.writeClassicXMLReport(filename=
'{0}.xml'.
format(classicName), fast=fast)
682 if reportType
is None or 'gpickle' in reportType:
683 self.
_report.writeGPickleReport(filename=
'{0}.gpickle'.
format(baseName), fast=fast)
685 if reportType
is None or 'pilotPickle' in reportType:
686 self.
_report.writePilotPickleReport(filename=
'{0}Extract.pickle'.
format(baseName), fast=fast, fileReport=fileReport)
689 msg.error(
'Received timeout when writing report ({0})'.
format(reportException))
690 msg.error(
'Report writing is aborted - sorry. Transform will exit with TRF_METADATA_CALL_FAIL status.')
691 if (
'orphanKiller' in self.
_argdict):
695 sys.exit(trfExit.nameToCode(
'TRF_METADATA_CALL_FAIL'))
699 msg.critical(
'Attempt to write job report failed with exception {0!s}: {1!s}'.
format(reportException.__class__.__name__, reportException))
700 msg.critical(
'Stack trace now follows:\n{0}'.
format(traceback.format_exc()))
701 msg.critical(
'Job reports are likely to be missing or incomplete - sorry')
702 msg.critical(
'Please report this as a transforms bug!')
703 msg.critical(
'Before calling the report generator the transform status was: {0}; exit code {1}'.
format(self.
_exitMsg, self.
_exitCode))
704 msg.critical(
'Now exiting with a transform internal error code')
705 if (
'orphanKiller' in self.
_argdict):
709 sys.exit(trfExit.nameToCode(
'TRF_INTERNAL'))
725 msg.critical(
'Transform received signal {0}'.
format(signum))
726 msg.critical(
'Stack trace now follows:\n{0!s}'.
format(
''.
join(traceback.format_stack(frame))))
733 msg.critical(
'Attempting to write reports with known information...')
735 if (
'orphanKiller' in self.
_argdict):
749 self.validation.
update(newValidationOptions)
754 return self.validation
760 if key
in self.validation:
761 return self.validation[key]
770 msg.debug(
'Looking for file arguments matching: io={0}'.
format(io))
772 if isinstance(arg, argFile):
773 msg.debug(
'Argument {0} is argFile type ({1!s})'.
format(argName, arg))
774 if io
is not None and arg.io != io:
776 msg.debug(
'Argument {0} matches criteria'.
format(argName))
786 if ((
'skipFileValidation' in self.
_argdict and self.
_argdict[
'skipFileValidation']
is True)
or
787 (
'skipInputFileValidation' in self.
_argdict and self.
_argdict[
'skipInputFileValidation']
is True)
or
788 (
'fileValidation' in self.
_argdict and self.
_argdict[
'fileValidation'].value
is False)
or
789 (
'inputFileValidation' in self.
_argdict and self.
_argdict[
'inputFileValidation'].value
is False)
791 msg.info(
'Standard input file validation turned off for transform %s.', self.
name)
793 msg.info(
'Validating input files')
794 if 'parallelFileValidation' in self.
_argdict:
795 trfValidation.performStandardFileValidation(dictionary=self.
_dataDictionary, io=
'input', parallelMode=self.
_argdict[
'parallelFileValidation'].value )
797 trfValidation.performStandardFileValidation(dictionary=self.
_dataDictionary, io=
'input')
807 if ((
'skipFileValidation' in self.
_argdict and self.
_argdict[
'skipFileValidation']
is True)
or
808 (
'skipOutputFileValidation' in self.
_argdict and self.
_argdict[
'skipOutputFileValidation']
is True)
or
809 (
'fileValidation' in self.
_argdict and self.
_argdict[
'fileValidation'].value
is False)
or
810 (
'outputFileValidation' in self.
_argdict and self.
_argdict[
'outputFileValidation'].value
is False)
812 msg.info(
'Standard output file validation turned off for transform %s.', self.
name)
814 msg.info(
'Validating output files')
815 parparallelMode =
False
816 parmultithreadedMode =
False
817 if 'parallelFileValidation' in self.
_argdict:
818 parparallelMode = self.
_argdict[
'parallelFileValidation'].value
819 if 'multithreadedFileValidation' in self.
_argdict:
820 parmultithreadedMode = self.
_argdict[
'multithreadedFileValidation'].value
821 trfValidation.performStandardFileValidation(dictionary=self.
_dataDictionary, io=
'output', parallelMode=parparallelMode, multithreadedMode=parmultithreadedMode)