17 msg = logging.getLogger(__name__)
19 import PyJobTransforms.trfExceptions
as trfExceptions
21 from PyJobTransforms.trfFileUtils import athFileInterestingKeys, AthenaLiteFileInfo, NTUPEntries, HISTEntries, PHYSVALEntries, PRWEntries, urlType, ROOTGetSize
23 from PyJobTransforms.trfExeStepTools
import commonExecutorStepName
25 from PyJobTransforms.trfDecorators
import timelimited
33 msg.debug(
'Initialised class %s with args=%s; kwargs=%s', genclass, args, kwargs)
39 msg.debug(
'Called class %s with value=%s; args=%s; kwargs=%s', self.
_genclass, valueString, self.
_args, self.
_kwargs)
47 if valueString
is None:
51 except Exception
as e:
52 msg.fatal(
'Got this exception raised when calling object factory: {0}'.
format(e))
62 def __init__(self, factory, option_strings, dest, **kwargs):
64 super().
__init__(option_strings, dest, **kwargs)
66 def __call__(self, parser, namespace, values, option_string=None):
67 msg.debug(
'Called action for factory=%s; values=%s', self.
_factory, values)
70 if isinstance(values, list):
73 setattr(namespace, self.dest, [self.
_factory(
None)])
75 setattr(namespace, self.dest, [self.
_factory(v)
for v
in values])
77 setattr(namespace, self.dest, self.
_factory(values))
85 msg.debug(
'Initialised action class %s with args=%s; kwargs=%s', genclass, args, kwargs)
92 def __call__(self, option_strings, dest, **kwargs):
109 def __init__(self, value = None, runarg = True, name = None):
149 desc = {
'type' :
None}
154 return '{0}: Value {1} (isRunArg={2})'.
format(self.__class__.__name__, self.
_value, self.
_runarg)
162 return self.
value == other.value
165 return self.
value != other.value
168 return self.
value < other.value
171 return self.
value > other.value
182 def __init__(self, value = None, runarg = True, name = None, choices = None):
184 super(argString, self).
__init__(value = value, runarg = runarg, name=name)
214 desc = {
'type' :
'str'}
242 if isinstance(value, int):
248 except ValueError
as e:
250 'Failed to convert value {0} to int: {1}'.
format(value, e))
255 desc = {
'type' :
'int'}
267 def __init__(self, value=None, min=None, max=None, runarg=True, name=None):
270 super(argFloat, self).
__init__(value = value, runarg = runarg, name=name)
280 desc = {
'type' :
'float'}
282 desc[
'min'] = self.
_min
284 desc[
'max'] = self.
_max
296 if self.
_min is not None:
302 if isinstance(value, float):
308 'Failed to convert %s to a float' %
str(value))
312 'argFloat value out of range: %g is not between %s and %s' %
336 if isinstance(value, bool):
344 desc = {
'type' :
'bool'}
356 def __init__(self, value = [], supressEmptyStrings = True, splitter=',', runarg=True, name=None):
360 super(argList, self).
__init__(value = value, runarg = runarg, name=name)
371 desc = {
'type' :
'list',
'listtype':
'str'}
381 if isinstance(value, (list, tuple)):
392 except AttributeError:
394 'Failed to convert %s to a list' %
str(value))
428 if isinstance(value, list):
430 if not isinstance(v, int):
432 'Illegal value {0} in list of ints'.
format(v))
444 except (AttributeError, ValueError):
446 'Failed to convert %s to a list of ints' %
str(value))
453 desc = {
'type' :
'list',
'listtype' :
'int'}
465 def __init__(self, value = {}, supressEmptyStrings = True, splitter=',', kvsplitter=":
", runarg=True, name=None):
470 super(argList, self).__init__(value = value, runarg = runarg, name=name)
485 if isinstance(value, dict):
486 for k, v
in value.items():
487 if not isinstance(k, str):
489 'Illegal key argument type {0} in dictionary for argKeyFloatValueList'.
format(k))
490 if not isinstance(v, float):
492 'Illegal value argument type {0} in dictionary for argKeyFloatValueList'.
format(v))
501 kvlist = [ v
for v
in value.split(self.
_splitter)
if v !=
'' ]
507 except (AttributeError, ValueError):
509 'Failed to convert {0} to a dictionary of string:int values'.
format(value))
516 desc = {
'type' :
'list',
'listtype' :
'str:float'}
544 def __init__(self, value=list(), type=
None, subtype=
None, io =
'output', splitter=
',', runarg=
True, guid=
None,
545 multipleOK =
None, name=
None, executor=
list(), mergeTargetSize=-1, auxiliaryFile=
False):
576 if multipleOK
is None:
585 super(argFile, self).
__init__(value=value, splitter=splitter, runarg=runarg, name=name)
617 @mergeTargetSize.setter
626 if isinstance(self.
_type, dict):
628 desc = {
'type' :
'file',
'subtype' :
"NONE" }
632 desc = {
'type' :
'file',
'subtype' :
str(self.
_type).
upper()}
647 if isinstance(value, (list, tuple)):
648 if len(value) > 0
and isinstance(value[0], dict):
656 'Filename (key "lfn") not found in Tier-0 file dictionary: {0}'.
format(myfile))
657 for k, v
in myfile.items():
662 elif k ==
'checksum':
669 'Inconsistent dataset names in Tier-0 dictionary: {0} != {1}'.
format(self.
dataset, v))
679 if value.lower().startswith(
'lfn'):
681 from PyUtils.PoolFile
import file_name
690 except (AttributeError, TypeError):
692 'Failed to convert %s to a list' %
str(value))
695 deDuplicatedValue = []
697 if fname
not in deDuplicatedValue:
698 deDuplicatedValue.append(fname)
700 msg.warning(
"Removing duplicated file {0} from file list".
format(fname))
701 if len(self.
_value) != len(deDuplicatedValue):
702 self.
_value = deDuplicatedValue
703 msg.warning(
'File list after duplicate removal: {0}'.
format(self.
_value))
714 if self.
_io ==
'input':
720 msg.debug(
'Found POSIX filesystem input - activating globbing')
722 for filename
in self.
_value:
724 globbedFiles = glob.glob(filename)
725 if len(globbedFiles) == 0:
727 'Input file argument {0} globbed to NO input files - probably the file(s) are missing'.
format(filename))
730 newValue.extend(globbedFiles)
733 msg.debug (
'File input is globbed to %s' % self.
_value)
736 msg.debug(
'Found root filesystem input - activating globbing')
738 for filename
in self.
_value:
739 if str(filename).startswith(
"root"):
740 msg.debug(
'Found input file name starting with "root," setting XRD_RUNFORKHANDLER=1, which enables fork handlers for xrootd in direct I/O')
741 os.environ[
"XRD_RUNFORKHANDLER"] =
"1"
742 if str(filename).startswith(
"https")
or str(filename).startswith(
"davs")
or not(
str(filename).endswith(
'/'))
and '*' not in filename
and '?' not in filename:
743 msg.debug(
'Seems that only one file was given: {0}'.
format(filename))
744 newValue.extend(([filename]))
749 if '*' in filename
or '?' in filename:
750 msg.debug(
'Split input into path for listdir() and a filemask to select available files.')
751 path = filename[0:filename.rfind(
'/')+1]
752 msg.debug(
'path: {0}'.
format(path))
753 fileMask = filename[filename.rfind(
'/')+1:len(filename)]
754 msg.debug(
'Will select according to: {0}'.
format(fileMask))
756 cmd = [
'/afs/cern.ch/project/eos/installation/atlas/bin/eos.select' ]
757 if not os.access (
'/afs/cern.ch/project/eos/installation/atlas/bin/eos.select', os.X_OK ):
759 'No execute access to "eos.select" - could not glob EOS input files.')
766 proc = subprocess.Popen(args = cmd,bufsize = 1, shell =
False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
768 output = proc.stdout.readlines()
771 'EOS list command ("{0!s}") failed: rc {1}, output {2}'.
format(cmd, rc, output))
772 msg.debug(
"eos returned: {0}".
format(output))
775 myFiles += [
str(path)+
str(line.rstrip(
'\n'))]
777 patt = re.compile(fileMask.replace(
'*',
'.*').
replace(
'?',
'.'))
778 for srmFile
in myFiles:
780 if(patt.search(srmFile))
is not None:
782 msg.debug(
'match: ',srmFile)
783 newValue.extend(([srmFile]))
785 newValue.extend(([srmFile]))
787 msg.debug(
'Selected files: ', newValue)
788 except (AttributeError, TypeError, OSError):
790 'Failed to convert %s to a list' %
str(value))
791 if len(self.
_value) > 0
and len(newValue) == 0:
794 'Input file argument(s) {0!s} globbed to NO input files - ls command failed')
796 msg.debug (
'File input is globbed to %s' % self.
_value)
800 'Multiple file arguments are not supported for {0} (was given: {1}'.
format(self, self.
_value))
808 if value
not in (
'input',
'output',
'temporary'):
810 'File arguments must be specified as input, output or temporary - got {0}'.
format(value))
857 m = re.match(
r'(input|output|tmp.)([A-Za-z0-9_]+?)(File)?$', value)
859 msg.debug(
"ArgFile name setter matched this: {0}".
format(m.groups()))
860 if self.
_type is None:
861 dtype = m.group(2).
split(
'_', 1)[0]
863 if re.match(
r'D(RAW|ESD|AOD)', dtype):
865 msg.debug(
"Autoset data type to {0}".
format(dtype))
868 msg.debug(
"Autoset data subtype to {0}".
format(m.group(2)))
871 msg.debug(
"ArgFile name setter did not match against '{0}'".
format(value))
893 events = self.
getSingleMetadata(fname=fname, metadataKey=
'nentries', populate =
not fast)
895 msg.debug(
'Got events=None for file {0} - returning None for this instance'.
format(fname))
897 if events ==
'UNDEFINED':
898 msg.debug(
'Got events=UNDEFINED for file {0} - returning UNDEFINED for this instance'.
format(fname))
900 if not isinstance(events, int):
901 msg.warning(
'Got unexpected events metadata for file {0}: {1!s} - returning None for this instance'.
format(fname, events))
903 totalEvents += events
914 if files == []
or '_fileMetadata' not in dir(self):
916 for fname
in self.
value:
920 if fname
in self.
value:
927 if self.
_guid is not None:
928 msg.debug(
'Now trying to set file GUID metadata using {0}'.
format(self.
_guid))
933 msg.warning(
'Explicit GUID {0} was passed for file {1}, but this file is not a member of this instance'.
format(guid, fname))
941 def getMetadata(self, files = None, metadataKeys = None, maskMetadataKeys = None, populate = True, flush = False):
945 elif isinstance(files, str):
947 msg.debug(
'getMetadata will examine these files: {0!s}'.
format(files))
949 if metadataKeys
is None:
951 elif isinstance(metadataKeys, str):
952 metadataKeys = [metadataKeys,]
953 if maskMetadataKeys
is not None:
954 metadataKeys = [k
for k
in metadataKeys
if k
not in maskMetadataKeys]
955 msg.debug(
'getMetadata will retrieve these keys: {0!s}'.
format(metadataKeys))
958 msg.debug(
'Flushing cached metadata values')
962 msg.debug(
'Checking metadata values')
968 for mdkey
in metadataKeys:
974 msg.error(
'Did not find metadata key {0!s} for file {1!s} - setting to None'.
format(mdkey, fname))
975 metadata[fname][mdkey] =
None
986 if not (isinstance(fname, str)
and isinstance(metadataKey, str)):
988 'Illegal call to getSingleMetadata function: {0!s} {1!s}'.
format(fname, metadataKey))
989 md = self.
getMetadata(files = fname, metadataKeys = metadataKey, populate = populate, flush = flush)
990 return md[fname][metadataKey]
997 msg.debug(
'Retrieving metadata keys {1!s} for files {0!s}'.
format(files, metadataKeys))
1008 for key
in metadataKeys:
1009 if key !=
'_exists':
1013 for key
in metadataKeys:
1015 msg.debug(
'Metadata key {0} is unknown for {1}'.
format(key, self.__class__.__name__))
1019 msg.debug(
'Found cached value for {0}:{1} = {2!s}'.
format(fname, key, self.
_fileMetadata[fname][key]))
1021 msg.debug(
'No cached value for {0}:{1}. Calling generator function {2} ({3})'.
format(fname, key, self.
_metadataKeys[key].__name__, self.
_metadataKeys[key]))
1024 msg.info(
"Metadata generator called to obtain {0} for {1}".
format(key, files))
1027 msg.error(
'Calling {0!s} raised an exception: {1!s}'.
format(self.
_metadataKeys[key].__name__, e))
1029 msg.warning(
'Call to function {0} for {1} file {2} failed to populate metadata key {3}'.
format(self.
_metadataKeys[key].__name__, self.__class__.__name__, fname, key))
1050 for k, v
in metadataKeys.items():
1051 msg.debug(
'Manualy setting {0} for file {1} to {2}'.
format(k, fname, v))
1062 msg.debug(
'Testing for cached values for files {0} and keys {1}'.
format(files, metadataKeys))
1065 elif isinstance(files, str):
1067 if metadataKeys
is None:
1069 elif isinstance(metadataKeys, str):
1070 metadataKeys = (metadataKeys,)
1074 for key
in metadataKeys:
1076 isCachedFlag =
False
1078 if isCachedFlag
is False:
1093 for filename
in self.
_value:
1094 if filename.find(
'#') > -1:
1095 (dataset, fname) = filename.split(
'#', 1)
1096 newValue.append(fname)
1097 msg.debug(
'Current dataset: {0}; New dataset {1}'.
format(self.
_dataset, dataset))
1100 'Found inconsistent dataset assignment in argFile setup: %s != %s' % (self.
_dataset, dataset))
1102 if len(newValue) == 0:
1104 elif len(newValue) != len (self.
_value):
1106 'Found partial dataset assignment in argFile setup from {0} (dsn#lfn notation must be uniform for all inputs)'.
format(self.
_value))
1118 except OSError
as e:
1119 msg.error(
'Got exception {0!s} raised while stating file {1}'.
format(e, fname))
1123 msg.debug(
'Calling ROOT TFile.GetSize({0})'.
format(fname))
1134 with open(fname)
as f:
1137 chunk = len(f.read(1024*1024))
1138 msg.debug(
'Read {0} bytes from {1}'.
format(chunk, fname))
1142 except OSError
as e:
1143 msg.error(
'Got exception {0!s} raised while checking integrity of file {1}'.
format(e, fname))
1145 except UnicodeDecodeError:
1146 msg.debug(
'Problem reading file as unicode, attempting with binary')
1149 with open(fname,
'rb')
as f:
1152 chunk = len(f.read(1024*1024))
1153 msg.debug(
'Read {0} bytes from {1}'.
format(chunk, fname))
1157 except OSError
as e:
1158 msg.error(
'Got exception {0!s} raised while checking integrity of file {1}'.
format(e, fname))
1167 msg.debug(
'Generating a GUID for file {0}'.
format(fname))
1177 msg.debug(
'Testing existance for {0}'.
format(files))
1181 size = os.stat(fname).st_size
1184 msg.debug(
'POSIX file {0} exists'.
format(fname))
1185 except OSError
as e:
1186 msg.error(
'Got exception {0!s} raised while stating file {1} - probably it does not exist'.
format(e, fname))
1190 msg.debug(
'Calling ROOT TFile.GetSize({0})'.
format(fname))
1194 msg.error(
'Non-POSIX file {0} could not be opened - probably it does not exist'.
format(fname))
1196 msg.debug(
'Non-POSIX file {0} exists'.
format(fname))
1212 for arg
in copyArgs:
1214 myargdict[arg] = copy.copy(argdict[arg])
1217 myargdict = copy.copy(argdict)
1219 myargdict[
'checkEventCount'] =
argSubstepBool(
'False', runarg=
False)
1221 if 'athenaopts' in myargdict:
1224 for subStep
in myargdict[
'athenaopts'].value:
1227 for opt
in myargdict[
'athenaopts'].value[subStep]:
1228 if opt.startswith(
'--nprocs'):
1232 elif opt.startswith(
'--threads'):
1238 if hasNprocs
and hasNthreads:
1241 if opt.startswith(
'--threads'):
1250 def __init__(self, value=list(), io =
'output', type=
None, splitter=
',', runarg=
True, multipleOK=
None, name=
None):
1251 super(argYODAFile, self).
__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1257 'lheSumOfNegWeights': 0,
1261 msg.debug(
'Retrieving event count for LHE file {0}'.
format(files))
1266 tar = tarfile.open(fname,
"r:gz")
1268 for untar
in tar.getmembers():
1269 fileTXT = tar.extractfile(untar)
1270 if fileTXT
is not None :
1271 lines = fileTXT.read().
decode(
"utf-8")
1272 lhecount = lines.count(
'/event')
1276 msg.debug(
'Entries is set to None - event count undefined for this LHE')
1280 msg.debug(
'Retrieving weight count for LHE file {0}'.
format(files))
1288 tar = tarfile.open(fname,
"r:gz")
1289 for untar
in tar.getmembers():
1290 fileTXT = tar.extractfile(untar)
1292 if fileTXT
is not None :
1293 for line
in fileTXT :
1294 line = line.decode(
"utf-8")
1297 w =
float(re.sub(
' +',
' ',line).
split(
" ")[2])
1298 if w > 0 : weightPos += w
1299 else : weightNeg += abs(w)
1303 if "<event" in line :
1309 msg.debug(
'Entries is set to None - negative fraction count undefined for this LHE')
1316 def __init__(self, value = list(), type=
None, subtype=
None, io =
'output', splitter=
',', runarg=
True, multipleOK =
None,
1317 name=
None, executor=
list(), mergeTargetSize=-1, auxiliaryFile=
False):
1318 super(argAthenaFile, self).
__init__(value=value, subtype=subtype, io=io, type=type, splitter=splitter, runarg=runarg,
1319 multipleOK=multipleOK, name=name, executor=executor, mergeTargetSize=mergeTargetSize,
1320 auxiliaryFile=auxiliaryFile)
1323 for key
in athFileInterestingKeys:
1332 msg.debug(
'Will retrieve metadata info for {0!s}'.
format(myFiles))
1340 for fname
in myFiles:
1342 if athFileMetadata
is None:
1344 msg.debug(
'Setting metadata for file {0} to {1}'.
format(fname, athFileMetadata[fname]))
1349 self.
_callAthInfo(files, doAllFiles =
True, retrieveKeys=athFileInterestingKeys)
1353 desc=super(argAthenaFile, self).prodsysDescription
1360 integrityFunction =
"returnIntegrityOfBSFile"
1365 rc=
call([
"AtlListBSEvents",
"-c", fname], logger=msg, message=
"Report by AtlListBSEvents: ", timeout=600)
1375 desc=super(argBSFile, self).prodsysDescription
1386 msg.debug(
'selfMerge attempted for {0} -> {1} with {2} (index {3})'.
format(inputs, output, argdict, counter))
1389 for fname
in inputs:
1390 if fname
not in self.
_value:
1392 "File {0} is not part of this agument: {1}".
format(fname, self))
1398 myargdict[
'maskEmptyInputs'] =
argBool(
True)
1399 myargdict[
'allowRename'] =
argBool(
True)
1400 myargdict[
'emptyStubFile'] =
argString(inputs[0])
1405 myDataDictionary = {
'BS_MRG_INPUT' :
argBSFile(inputs, type=self.
type, io=
'input'),
1406 'BS_MRG_OUTPUT' :
argBSFile(output, type=self.
type, io=
'output')}
1407 myMergeConf = executorConfig(myargdict, myDataDictionary)
1408 myMerger = bsMergeExecutor(name=
'BSMergeAthenaMP{0}{1}'.
format(self.
_subtype, counter), conf=myMergeConf, exe =
'file_merging',
1409 inData=
set([
'BS_MRG_INPUT']), outData=
set([
'BS_MRG_OUTPUT']))
1410 myMerger.doAll(input=
set([
'BS_MRG_INPUT']), output=
set([
'BS_MRG_OUTPUT']))
1414 for fname
in inputs:
1418 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1427 integrityFunction =
"returnIntegrityOfPOOLFile"
1433 from PyJobTransforms.trfValidateRootFile
import checkFile
1434 rc=
checkFile(fileName=fname, the_type=
'event', requireTree=
False)
1442 desc=super(argPOOLFile, self).prodsysDescription
1451 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1454 for fname
in inputs:
1455 if fname
not in self.
_value:
1457 "File {0} is not part of this agument: {1}".
format(fname, self))
1467 myDataDictionary = {
'POOL_MRG_INPUT' :
argPOOLFile(inputs, type=self.
type, io=
'input'),
1469 myMergeConf = executorConfig(myargdict, myDataDictionary)
1470 myMerger = athenaExecutor(name=
'POOLMergeAthenaMP{0}{1}'.
format(self.
_subtype, counter), conf=myMergeConf,
1471 skeletonCA =
'RecJobTransforms.MergePool_Skeleton',
1472 inData=
set([
'POOL_MRG_INPUT']), outData=
set([
'POOL_MRG_OUTPUT']),
1473 disableMT=
True, disableMP=
True)
1474 myMerger.doAll(input=
set([
'POOL_MRG_INPUT']), output=
set([
'POOL_MRG_OUTPUT']))
1478 for fname
in inputs:
1482 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1488 integrityFunction =
"returnIntegrityOfPOOLFile"
1492 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1495 for fname
in inputs:
1496 if fname
not in self.
_value:
1498 "File {0} is not part of this agument: {1}".
format(fname, self))
1501 mySubstepName =
'HITSMergeAthenaMP{0}'.
format(counter)
1505 myDataDictionary = {
'HITS' :
argHITSFile(inputs, type=self.
type, io=
'input'),
1507 myMergeConf = executorConfig(myargdict, myDataDictionary)
1508 myMerger = athenaExecutor(name = mySubstepName,
1509 skeletonCA =
'SimuJobTransforms.HITSMerge_Skeleton',
1511 inData=
set([
'HITS']), outData=
set([
'HITS_MRG']),
1512 disableMT=
True, disableMP=
True)
1513 myMerger.doAll(input=
set([
'HITS']), output=
set([
'HITS_MRG']))
1517 for fname
in inputs:
1521 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1528 integrityFunction =
"returnIntegrityOfPOOLFile"
1532 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1535 for fname
in inputs:
1536 if fname
not in self.
_value:
1538 "File {0} is not part of this agument: {1}".
format(fname, self))
1541 mySubstepName =
'EVNT_TRMergeAthenaMP{0}'.
format(counter)
1547 myMergeConf = executorConfig(myargdict, myDataDictionary)
1548 myMerger = athenaExecutor(name = mySubstepName, skeletonFile =
'SimuJobTransforms/skeleton.EVNT_TRMerge.py',
1550 inData=
set([
'EVNT_TR']), outData=
set([
'EVNT_TR_MRG']),
1551 disableMT=
True, disableMP=
True)
1552 myMerger.doAll(input=
set([
'EVNT_TR']), output=
set([
'EVNT_TR_MRG']))
1556 for fname
in inputs:
1560 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1567 integrityFunction =
"returnIntegrityOfPOOLFile"
1571 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1574 for fname
in inputs:
1575 if fname
not in self.
_value:
1577 "File {0} is not part of this agument: {1}".
format(fname, self))
1583 myDataDictionary = {
'RDO' :
argHITSFile(inputs, type=self.
type, io=
'input'),
1585 myMergeConf = executorConfig(myargdict, myDataDictionary)
1586 myMerger = athenaExecutor(name =
'RDOMergeAthenaMP{0}'.
format(counter),
1587 skeletonCA =
'SimuJobTransforms.RDOMerge_Skeleton',
1589 inData=
set([
'RDO']), outData=
set([
'RDO_MRG']),
1590 disableMT=
True, disableMP=
True)
1591 myMerger.doAll(input=
set([
'RDO']), output=
set([
'RDO_MRG']))
1595 for fname
in inputs:
1599 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1605 integrityFunction =
"returnIntegrityOfPOOLFile"
1609 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1612 for fname
in inputs:
1613 if fname
not in self.
_value:
1615 "File {0} is not part of this agument: {1}".
format(fname, self))
1618 mySubstepName =
'EVNTMergeAthenaMP{0}'.
format(counter)
1622 myDataDictionary = {
'EVNT' :
argEVNTFile(inputs, type=self.
type, io=
'input'),
1624 myMergeConf = executorConfig(myargdict, myDataDictionary)
1625 myMerger = athenaExecutor(name = mySubstepName, skeletonCA =
'EvgenJobTransforms.EVNTMerge_Skeleton',
1627 inData=
set([
'EVNT']), outData=
set([
'EVNT_MRG']),
1628 disableMT=
True, disableMP=
True)
1629 myMerger.doAll(input=
set([
'EVNT']), output=
set([
'EVNT_MRG']))
1633 for fname
in inputs:
1637 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1645 integrityFunction =
"returnIntegrityOfHISTFile"
1647 def __init__(self, value=list(), io =
'output', type=
None, subtype=
None, splitter=
',', runarg=
True, countable=
True, multipleOK =
None,
1648 name=
None, auxiliaryFile=
False):
1649 super(argHISTFile, self).
__init__(value=value, io=io, type=type, subtype=subtype, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1650 name=name, auxiliaryFile=auxiliaryFile)
1669 msg.error(
'Timeout counting events for {0}'.
format(fname))
1673 desc=super(argHISTFile, self).prodsysDescription
1681 integrityFunction =
"returnIntegrityOfNTUPFile"
1686 def __init__(self, value=list(), io =
'output', type=
None, subtype=
None, splitter=
',', treeNames=
None, runarg=
True, multipleOK =
None,
1687 name=
None, mergeTargetSize=-1, auxiliaryFile=
False):
1688 super(argNTUPFile, self).
__init__(value=value, io=io, type=type, subtype=subtype, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1689 name=name, mergeTargetSize=mergeTargetSize, auxiliaryFile=auxiliaryFile)
1698 if name
and 'NTUP_PILEUP' in name:
1704 msg.debug(
'Retrieving event count for NTUP files {0}'.
format(files))
1709 if myPRWEntries
is not None:
1711 if self.
name and 'NTUP_PILEUP' in self.
name:
1712 myPRWEntries =
PRWEntries(fileName=fname, integral=
True)
1717 if myPHYSVALEntries
is not None:
1719 if self.
name and 'NTUP_PHYSVAL' in self.
name:
1721 self.
_fileMetadata[fname][
'sumOfWeights'] = myPHYSVALEntries
1723 msg.debug(
'treeNames is set to None - event count undefined for this NTUP')
1730 msg.error(
'Timeout counting events for {0}'.
format(fname))
1735 from PyJobTransforms.trfValidateRootFile
import checkFile
1736 rc=
checkFile(fileName=fname, the_type=
'basket', requireTree=
False)
1744 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1747 for fname
in inputs:
1748 if fname
not in self.
_value:
1750 "File {0} is not part of this agument: {1}".
format(fname, self))
1758 myDataDictionary = {
'NTUP_MRG_INPUT' :
argNTUPFile(inputs, type=self.
type, io=
'input'),
1760 myMergeConf = executorConfig(myargdict, myDataDictionary)
1761 myMerger = NTUPMergeExecutor(name=
'NTUPMergeAthenaMP{0}{1}'.
format(self.
_subtype, counter), conf=myMergeConf,
1762 inData=
set([
'NTUP_MRG_INPUT']), outData=
set([
'NTUP_MRG_OUTPUT']))
1763 myMerger.doAll(input=
set([
'NTUP_MRG_INPUT']), output=
set([
'NYUP_MRG_OUTPUT']))
1767 for fname
in inputs:
1771 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1777 desc=super(argNTUPFile, self).prodsysDescription
1788 f = bz2.BZ2File(fname,
'r')
1790 chunk = len(f.read(1024*1024))
1791 msg.debug(
'Read {0} bytes from {1}'.
format(chunk, fname))
1796 except OSError
as e:
1797 msg.error(
'Got exception {0!s} raised while checking integrity of file {1}'.
format(e, fname))
1803 desc=super(argBZ2File, self).prodsysDescription
1809 def __init__(self, value=list(), io =
'output', type=
None, splitter=
',', runarg=
True, multipleOK=
None, name=
None):
1810 super(argFTKIPFile, self).
__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1820 f = bz2.BZ2File(fname,
'r')
1822 if line.startswith(
'F'):
1825 except OSError
as e:
1826 msg.error(
'Event count for file {0} failed: {1!s}'.
format(fname, e))
1831 desc=super(argFTKIPFile, self).prodsysDescription
1837 def __init__(self, value=list(), io =
'output', type=
'txt_evt', splitter=
',', runarg=
True, multipleOK=
None, name=
None):
1838 super(argHepEvtAsciiFile, self).
__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg,
1839 multipleOK=multipleOK, name=name)
1848 f =
open(fname,
'r')
1850 if len(line.split(
" "))==3:
1853 except OSError
as e:
1854 msg.error(
'Event count for file {0} failed: {1!s}'.
format(fname, e))
1859 def __init__(self, value=list(), io =
'output', type=
None, splitter=
',', runarg=
True, multipleOK=
None, name=
None):
1860 super(argLHEFile, self).
__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1866 'lheSumOfNegWeights': 0,
1870 msg.debug(
'Retrieving event count for LHE file {0}'.
format(files))
1875 tar = tarfile.open(fname,
"r:gz")
1877 for untar
in tar.getmembers():
1878 fileTXT = tar.extractfile(untar)
1879 if fileTXT
is not None :
1880 lines = fileTXT.read().
decode(
"utf-8")
1881 lhecount = lines.count(
'/event')
1885 msg.debug(
'Entries is set to None - event count undefined for this LHE')
1889 msg.debug(
'Retrieving weight count for LHE file {0}'.
format(files))
1897 tar = tarfile.open(fname,
"r:gz")
1898 for untar
in tar.getmembers():
1899 fileTXT = tar.extractfile(untar)
1901 if fileTXT
is not None :
1902 lines = fileTXT.readlines()
1906 w =
float(re.sub(
' +',
' ',line).
split(
" ")[2])
1907 if w > 0 : weightPos += w
1908 else : weightNeg += abs(w)
1912 if "<event" in line :
1918 msg.debug(
'Entries is set to None - negative fraction count undefined for this LHE')
1919 self.
_fileMetadata[fname][
'lheSumOfPosWeights'] =
'UNDEFINED'
1920 self.
_fileMetadata[fname][
'lheSumOfNegWeights'] =
'UNDEFINED'
1931 def __init__(self, value = None, runarg = True, name = None, defaultSubstep = 'all', separator = ':
'):
1934 super(argSubstep, self).__init__(value, runarg, name)
1944 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
1947 elif isinstance(value, str):
1949 elif isinstance(value, (list, tuple)):
1953 if not isinstance(item, str):
1956 elif isinstance(value, dict):
1968 subStepMatch = re.match(
r'([a-zA-Z0-9,]+)' + self.
_separator +
r'(.*)', string)
1971 subStep = subStepMatch.group(1).
split(
',')
1972 subStepValue = subStepMatch.group(2)
1975 subStepValue = string
1976 msg.debug(
'Parsed {0} as substep {1}, argument {2}'.
format(string, subStep, subStepValue))
1977 for step
in subStep:
1978 subStepList.append((step, subStepValue))
1990 substep = exe.substep
1991 first = exe.conf.firstExecutor
1998 value = self.
_value[name]
1999 elif substep
in self.
_value:
2000 value = self.
_value[substep]
2001 elif first
and 'first' in self.
_value:
2002 value = self.
_value[
'first']
2003 elif 'default' in self.
_value:
2004 value = self.
_value[
'default']
2016 value = self.
_value[
'all']
2017 elif isinstance(value, list):
2018 value = self.
_value[
'all'] + value
2020 msg.debug(
'From substep argument {myvalue} picked value "{value}" for {name}, {substep}, first={first}'.
format(myvalue=self.
_value, value=value, name=name, substep=substep, first=first))
2026 desc = {
'type':
'substep',
'substeptype':
'str',
'separator': self.
_separator,
2041 def __init__(self, value = None, runarg = True, name = None, defaultSubstep = 'all', splitter = None, separator=':
'):
2043 super(argSubstepList, self).__init__(value, runarg, name, defaultSubstep, separator)
2053 desc = {
'type':
'substep',
'substeptype':
'list',
'listtype':
'str',
2059 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
2062 elif isinstance(value, str):
2064 elif isinstance(value, (list, tuple)):
2068 if not isinstance(item, str):
2071 for subStep
in subStepList:
2072 if subStep[0]
in self.
_value:
2075 self.
_value[subStep[0]] = subStep[1]
2076 elif isinstance(value, dict):
2077 for k, v
in value.items():
2078 if not isinstance(k, str):
2080 if not isinstance(v, list):
2091 subStepList = [(s[0], s[1].
split(self.
_splitter))
for s
in subStepList]
2093 subStepList = [(s[0], [s[1]])
for s
in subStepList]
2106 desc = {
'type':
'substep',
'substeptype':
'str',
'separator': self.
_separator,
2112 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
2115 elif isinstance(value, str):
2117 self.
_value = dict([(subStep[0], subStep[1])
for subStep
in subStepList])
2118 elif isinstance(value, (list, tuple)):
2122 if not isinstance(item, str):
2125 for subStep
in subStepList:
2126 self.
_value[subStep[0]] = subStep[1]
2127 elif isinstance(value, dict):
2128 for k, v
in value.items():
2129 if not isinstance(k, str):
2131 if not isinstance(v, str):
2147 desc = {
'type':
'substep',
'substeptype':
'bool',
'separator': self.
_separator,
2153 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1})'.
format(value,
type(value)))
2156 elif isinstance(value, bool):
2158 elif isinstance(value, str):
2160 self.
_value = dict([(subStep[0],
strToBool(subStep[1]))
for subStep
in subStepList])
2161 elif isinstance(value, (list, tuple)):
2165 if not isinstance(item, str):
2168 for subStep
in subStepList:
2170 elif isinstance(value, dict):
2171 for k, v
in value.items():
2172 if not isinstance(k, str):
2174 if not isinstance(v, bool):
2191 desc = {
'type':
'substep',
'substeptype':
'int',
'separator': self.
_separator,
2197 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
2201 elif isinstance(value, int):
2203 elif isinstance(value, str):
2205 self.
_value = dict([(subStep[0],
int(subStep[1]))
for subStep
in subStepList])
2206 elif isinstance(value, (list, tuple)):
2210 if not isinstance(item, str):
2213 for subStep
in subStepList:
2214 self.
_value[subStep[0]] =
int(subStep[1])
2215 elif isinstance(value, dict):
2216 for k, v
in value.items():
2217 if not isinstance(k, str):
2219 if not isinstance(v, int):
2232 def __init__(self, value=None, min=None, max=None, runarg=True, name=None):
2235 super(argSubstepFloat, self).
__init__(value = value, runarg = runarg, name=name)
2239 desc = {
'type':
'substep',
'substeptype':
'float',
'separator': self.
_separator,
2242 desc[
'min'] = self.
_min
2244 desc[
'max'] = self.
_max
2255 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
2259 elif isinstance(value, float):
2261 elif isinstance(value, str):
2263 self.
_value = dict([(subStep[0],
float(subStep[1]))
for subStep
in subStepList])
2264 elif isinstance(value, (list, tuple)):
2268 if not isinstance(item, str):
2270 'Failed to convert list item {0!s} to substep (should be a string)'.
format(item))
2272 for subStep
in subStepList:
2274 elif isinstance(value, dict):
2275 for k, v
in value.items():
2276 if not isinstance(k, str):
2278 'Dictionary key {0!s} for substep is not a string'.
format(k))
2279 if not isinstance(v, float):
2281 'Dictionary value {0!s} for substep is not an float'.
format(v))
2285 'Setter value {0!s} (type {1}) for substep argument cannot be parsed'.
format(value,
type(value)))
2288 if (self.
_min is not None and my_float < self.
_min)
or (self.
_max is not None and my_float > self.
_max):
2290 'argFloat value out of range: {0} is not between {1} and {2}'.
format(my_float, self.
_min, self.
_max))
2291 except ValueError
as e:
2293 'Failed to convert substep value {0} to float: {1}'.
format(value, e))
2309 'doRDO_TRIG': {
'RAWtoALL': [(
'in',
'-',
'RDO'), (
'in',
'+',
'RDO_TRIG'), (
'in',
'-',
'BS')]},
2310 'doOverlay': {
'HITtoRDO': [(
'in',
'-',
'HITS'), (
'out',
'-',
'RDO'), (
'out',
'-',
'RDO_FILT')],
2311 'Overlay': [(
'in',
'+', (
'HITS',
'RDO_BKG')), (
'out',
'+',
'RDO')]},
2312 'doFCwOverlay': {
'EVNTtoRDO': [(
'in',
'-',
'EVNT'), (
'out',
'-',
'RDO')],
2313 'EVNTtoRDOwOverlay': [(
'in',
'+', (
'EVNT',
'RDO_BKG')), (
'out',
'+',
'RDO'), (
'out',
'+',
'RDO_SGNL')]},
2314 'afterburn': {
'generate': [(
'out',
'-',
'EVNT')]},
2332 desc = {
'type':
'substep',
'substeptype':
'steering',
'listtype':
'str',
'separator': self.
_separator,
2341 msg.debug(
'Attempting to set argSubstepSteering from {0!s} (type {1})'.
format(value,
type(value)))
2345 elif isinstance(value, dict):
2347 for k, v
in value.items():
2348 if not isinstance(k, str)
or not isinstance(v, list):
2350 'Failed to convert dict {0!s} to argSubstepSteering'.
format(value))
2352 if not isinstance(subv, (list, tuple))
or len(subv) != 3
or subv[0]
not in (
'in',
'out')
or subv[1]
not in (
'+',
'-'):
2354 'Failed to convert dict {0!s} to argSubstepSteering'.
format(value))
2360 self.
_dumpvalue = getattr(self,
"_dumpvalue", value)
2361 elif isinstance(value, (str, list, tuple)):
2362 if isinstance(value, str):
2364 self.
_dumpvalue = getattr(self,
"_dumpvalue", value)
2368 if not isinstance(item, str):
2370 'Failed to convert list item {0!s} to substep (should be a string)'.
format(item))
2371 if item
in argSubstepSteering.steeringAlises:
2372 msg.debug(
"Found value {0} in steeringAlises ({1})".
format(item, argSubstepSteering.steeringAlises[item]))
2373 for substep, steerlist
in argSubstepSteering.steeringAlises[item].
items():
2374 if substep
in self.
_value:
2377 self.
_value[substep] = steerlist
2383 'Setter value {0!s} (type {1}) for substep argument cannot be parsed'.
format(value,
type(value)))
2386 if string
in argSubstepSteering.steeringAlises:
2387 return argSubstepSteering.steeringAlises[string]
2391 for subvalue
in ivalue.split(
','):
2392 matchedParts = re.match(
r'(in|out)(\+|\-)([A-Z_]+)$', subvalue)
2393 if not matchedParts:
2395 'Failed to convert string {0!s} to argSubstepSteering'.
format(subvalue))
2396 retvalue.append((matchedParts.group(1), matchedParts.group(2), matchedParts.group(3)))
2408 msg.debug(
'Attempting to set argSubstepConditions from {0!s} (type {1}'.
format(value,
type(value)))
2410 super(self.__class__, self.__class__).value.fset(self, value)
2414 if "CurrentMC" == v:
2420 cmd =
"COMAGetGlobalTagNameByCurrentState --state=CurrentMC"
2421 return str(client.execute(cmd, format =
'dom_object').get_rows().pop()[
'globalTag'])
2425 desc = {
'type':
'substep',
'substeptype':
'str',
'separator': self.
_separator,
2443 super(trfArgParser, self).
__init__(*args, **kwargs)
2446 argName = args[0].lstrip(
'-')
2447 msg.debug(
'Found arg name {0}'.
format(argName))
2453 'Transform arguments may not use hyphens (use camelCase or underscore')
2458 msg.debug(
'Double definition of argument {0} - ignored'.
format(argName))
2462 if 'help' in kwargs:
2466 if 'action' in kwargs
and 'factory' in dir(kwargs[
'action']):
2467 self.
_argClass[argName] = kwargs[
'action'].factory
2468 elif 'type' in kwargs:
2469 self.
_argClass[argName] = kwargs[
'type']
2475 for arg
in (
'group',):
2477 strippedArgs[arg] = kwargs.pop(arg)
2481 for i
in range(1, len(args)):
2482 argAlias = args[i].lstrip(
'-')
2483 msg.debug(
'Adding an alias of {0}: {1}'.
format(argName, argAlias))
2487 if 'group' in strippedArgs:
2489 msg.debug(
'Adding argument to group {0}: ({1}; {2})'.
format(strippedArgs[
'group'], args, kwargs))
2493 msg.warning(
'Argument group {0} not defined - adding argument to main parser'.
format(strippedArgs[
'group']))
2494 msg.debug(
'Adding argument: ({0}; {1})'.
format(args, kwargs))
2495 super(trfArgParser, self).
add_argument(*args, **kwargs)
2497 msg.debug(
'Adding argument: ({0}; {1})'.
format(args, kwargs))
2498 super(trfArgParser, self).
add_argument(*args, **kwargs)
2504 msg.debug(
'Detected the local variable {0}'.
format(name))
2505 if argClass
is not None:
2506 desc[name] = argClass().prodsysDescription
2517 msg.warning(
'Argument group %s already exists', args[0])
2519 self.
_argGroups[args[0]] = self.add_argument_group(*args)
2532 keyArray = [
'--' +
str(key)
for key
in self.
_helpString if key
not in (
'h',
'verbose',
'loglevel',
'dumpargs',
'argdict') ]
2534 print(
'ListOfDefaultPositionalKeys={0}'.
format(keyArray))
2546 newValueObj = value[0]
2547 msg.debug(
'Started with: %s = %s',
type(newValueObj), newValueObj)
2548 if isinstance(value[0], argSubstep):
2551 elif isinstance(value[0], list):
2554 msg.debug(
'Handling a list of arguments for key')
2558 processedValueObj.value = processedValues
2559 newValues.append(processedValueObj)
2560 newValueObj = newValues
2561 return newValueObj, newValues
2562 elif isinstance(value[0].value, list):
2563 newValues = value[0].value
2564 elif isinstance(value[0].value, dict):
2565 newValues = value[0].value
2567 newValues = [value[0].value,]
2568 for valueObj
in value[1:]:
2569 msg.debug(
'Value Object: %s = %s',
type(valueObj), valueObj)
2570 if isinstance(value[0], argSubstep):
2573 elif isinstance(valueObj.value, list):
2575 newValues.extend(valueObj.value)
2576 elif isinstance(valueObj.value, dict):
2578 newValues.update(valueObj.value)
2580 newValues.append(valueObj.value)
2581 return newValueObj, newValues
2590 super(trfArgParser, self).
parse_args(args = args, namespace = namespace)
2592 namespace = super(trfArgParser, self).
parse_args(args = args)
2593 for k, v
in namespace.__dict__.items():
2594 msg.debug(
'Treating key %s (%s)', k, v)
2595 if isinstance(v, list):
2597 if not isinstance(newValueObj, list):
2598 newValueObj.value = newValues
2599 namespace.__dict__[k] = newValueObj
2600 msg.debug(
'Set to %s', newValues)
2608 msg.debug(
"converting string {string} to boolean".
format(string = string))
2609 if string.lower() ==
'false':
2611 elif string.lower() ==
'true':
2615 except AttributeError:
2629 allKeys =
set(dict1) |
set(dict2)
2633 if isinstance(
list(dict1.values())[0], list):
2635 elif len(dict2) > 0:
2636 if isinstance(
list(dict2.values())[0], list):
2640 mergeDict[key] = dict1.get(key, []) + dict2.get(key, [])
2643 if key
in dict1
and key
in dict2:
2645 if dict1[key] != dict2[key]:
2647 'Merging substep arguments found clashing values for substep {0}: {1}!={2}'.
format(key, dict1[key], dict2[key]))
2648 mergeDict[key] = dict1[key]
2650 mergeDict[key] = dict1[key]
2652 mergeDict[key] = dict2[key]