17 msg = logging.getLogger(__name__)
19 import PyJobTransforms.trfExceptions
as trfExceptions
21 from PyJobTransforms.trfFileUtils import athFileInterestingKeys, AthenaLiteFileInfo, NTUPEntries, HISTEntries, PHYSVALEntries, PRWEntries, urlType, ROOTGetSize
23 from PyJobTransforms.trfExeStepTools
import commonExecutorStepName
25 from PyJobTransforms.trfDecorators
import timelimited
33 msg.debug(
'Initialised class %s with args=%s; kwargs=%s', genclass, args, kwargs)
39 msg.debug(
'Called class %s with value=%s; args=%s; kwargs=%s', self.
_genclass, valueString, self.
_args, self.
_kwargs)
47 if valueString
is None:
51 except Exception
as e:
52 msg.fatal(
'Got this exception raised when calling object factory: {0}'.
format(e))
62 def __init__(self, factory, option_strings, dest, **kwargs):
64 super().
__init__(option_strings, dest, **kwargs)
66 def __call__(self, parser, namespace, values, option_string=None):
67 msg.debug(
'Called action for factory=%s; values=%s', self.
_factory, values)
70 if isinstance(values, list):
73 setattr(namespace, self.dest, [self.
_factory(
None)])
75 setattr(namespace, self.dest, [self.
_factory(v)
for v
in values])
77 setattr(namespace, self.dest, self.
_factory(values))
85 msg.debug(
'Initialised action class %s with args=%s; kwargs=%s', genclass, args, kwargs)
92 def __call__(self, option_strings, dest, **kwargs):
109 def __init__(self, value = None, runarg = True, name = None):
149 desc = {
'type' :
None}
154 return '{0}: Value {1} (isRunArg={2})'.
format(self.__class__.__name__, self.
_value, self.
_runarg)
162 return self.
value == other.value
165 return self.
value != other.value
168 return self.
value < other.value
171 return self.
value > other.value
182 def __init__(self, value = None, runarg = True, name = None, choices = None):
184 super(argString, self).
__init__(value = value, runarg = runarg, name=name)
214 desc = {
'type' :
'str'}
242 if isinstance(value, int):
248 except ValueError
as e:
250 'Failed to convert value {0} to int: {1}'.
format(value, e))
255 desc = {
'type' :
'int'}
267 def __init__(self, value=None, min=None, max=None, runarg=True, name=None):
270 super(argFloat, self).
__init__(value = value, runarg = runarg, name=name)
280 desc = {
'type' :
'float'}
282 desc[
'min'] = self.
_min
284 desc[
'max'] = self.
_max
296 if self.
_min is not None:
302 if isinstance(value, float):
308 'Failed to convert %s to a float' %
str(value))
312 'argFloat value out of range: %g is not between %s and %s' %
336 if isinstance(value, bool):
344 desc = {
'type' :
'bool'}
356 def __init__(self, value = [], supressEmptyStrings = True, splitter=',', runarg=True, name=None):
360 super(argList, self).
__init__(value = value, runarg = runarg, name=name)
371 desc = {
'type' :
'list',
'listtype':
'str'}
381 if isinstance(value, (list, tuple)):
392 except AttributeError:
394 'Failed to convert %s to a list' %
str(value))
428 if isinstance(value, list):
430 if not isinstance(v, int):
432 'Illegal value {0} in list of ints'.
format(v))
444 except (AttributeError, ValueError):
446 'Failed to convert %s to a list of ints' %
str(value))
453 desc = {
'type' :
'list',
'listtype' :
'int'}
465 def __init__(self, value = {}, supressEmptyStrings = True, splitter=',', kvsplitter=":
", runarg=True, name=None):
470 super(argList, self).__init__(value = value, runarg = runarg, name=name)
485 if isinstance(value, dict):
486 for k, v
in value.items():
487 if not isinstance(k, str):
489 'Illegal key argument type {0} in dictionary for argKeyFloatValueList'.
format(k))
490 if not isinstance(v, float):
492 'Illegal value argument type {0} in dictionary for argKeyFloatValueList'.
format(v))
501 kvlist = [ v
for v
in value.split(self.
_splitter)
if v !=
'' ]
507 except (AttributeError, ValueError):
509 'Failed to convert {0} to a dictionary of string:int values'.
format(value))
516 desc = {
'type' :
'list',
'listtype' :
'str:float'}
544 def __init__(self, value=list(), type=
None, subtype=
None, io =
'output', splitter=
',', runarg=
True, guid=
None,
545 multipleOK =
None, name=
None, executor=
list(), mergeTargetSize=-1, auxiliaryFile=
False):
576 if multipleOK
is None:
585 super(argFile, self).
__init__(value=value, splitter=splitter, runarg=runarg, name=name)
617 @mergeTargetSize.setter
626 if isinstance(self.
_type, dict):
628 desc = {
'type' :
'file',
'subtype' :
"NONE" }
632 desc = {
'type' :
'file',
'subtype' :
str(self.
_type).
upper()}
647 if isinstance(value, (list, tuple)):
648 if len(value) > 0
and isinstance(value[0], dict):
656 'Filename (key "lfn") not found in Tier-0 file dictionary: {0}'.
format(myfile))
657 for k, v
in myfile.items():
662 elif k ==
'checksum':
669 'Inconsistent dataset names in Tier-0 dictionary: {0} != {1}'.
format(self.
dataset, v))
679 if value.lower().startswith(
'lfn'):
681 from PyUtils.PoolFile
import file_name
690 except (AttributeError, TypeError):
692 'Failed to convert %s to a list' %
str(value))
695 deDuplicatedValue = []
697 if fname
not in deDuplicatedValue:
698 deDuplicatedValue.append(fname)
700 msg.warning(
"Removing duplicated file {0} from file list".
format(fname))
701 if len(self.
_value) != len(deDuplicatedValue):
702 self.
_value = deDuplicatedValue
703 msg.warning(
'File list after duplicate removal: {0}'.
format(self.
_value))
714 if self.
_io ==
'input':
720 msg.debug(
'Found POSIX filesystem input - activating globbing')
722 for filename
in self.
_value:
724 globbedFiles = glob.glob(filename)
725 if len(globbedFiles) == 0:
727 'Input file argument {0} globbed to NO input files - probably the file(s) are missing'.
format(filename))
730 newValue.extend(globbedFiles)
733 msg.debug (
'File input is globbed to %s' % self.
_value)
736 msg.debug(
'Found root filesystem input - activating globbing')
738 for filename
in self.
_value:
739 if str(filename).startswith(
"root"):
740 msg.debug(
'Found input file name starting with "root," setting XRD_RUNFORKHANDLER=1, which enables fork handlers for xrootd in direct I/O')
741 os.environ[
"XRD_RUNFORKHANDLER"] =
"1"
742 if str(filename).startswith(
"https")
or str(filename).startswith(
"davs")
or not(
str(filename).endswith(
'/'))
and '*' not in filename
and '?' not in filename:
743 msg.debug(
'Seems that only one file was given: {0}'.
format(filename))
744 newValue.extend(([filename]))
749 if '*' in filename
or '?' in filename:
750 msg.debug(
'Split input into path for listdir() and a filemask to select available files.')
751 path = filename[0:filename.rfind(
'/')+1]
752 msg.debug(
'path: {0}'.
format(path))
753 fileMask = filename[filename.rfind(
'/')+1:len(filename)]
754 msg.debug(
'Will select according to: {0}'.
format(fileMask))
756 cmd = [
'/afs/cern.ch/project/eos/installation/atlas/bin/eos.select' ]
757 if not os.access (
'/afs/cern.ch/project/eos/installation/atlas/bin/eos.select', os.X_OK ):
759 'No execute access to "eos.select" - could not glob EOS input files.')
766 proc = subprocess.Popen(args = cmd,bufsize = 1, shell =
False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
768 output = proc.stdout.readlines()
771 'EOS list command ("{0!s}") failed: rc {1}, output {2}'.
format(cmd, rc, output))
772 msg.debug(
"eos returned: {0}".
format(output))
775 myFiles += [
str(path)+
str(line.rstrip(
'\n'))]
777 patt = re.compile(fileMask.replace(
'*',
'.*').
replace(
'?',
'.'))
778 for srmFile
in myFiles:
780 if(patt.search(srmFile))
is not None:
782 msg.debug(
'match: ',srmFile)
783 newValue.extend(([srmFile]))
785 newValue.extend(([srmFile]))
787 msg.debug(
'Selected files: ', newValue)
788 except (AttributeError, TypeError, OSError):
790 'Failed to convert %s to a list' %
str(value))
791 if len(self.
_value) > 0
and len(newValue) == 0:
794 'Input file argument(s) {0!s} globbed to NO input files - ls command failed')
796 msg.debug (
'File input is globbed to %s' % self.
_value)
800 'Multiple file arguments are not supported for {0} (was given: {1}'.
format(self, self.
_value))
808 if value
not in (
'input',
'output',
'temporary'):
810 'File arguments must be specified as input, output or temporary - got {0}'.
format(value))
857 m = re.match(
r'(input|output|tmp.)([A-Za-z0-9_]+?)(File)?$', value)
859 msg.debug(
"ArgFile name setter matched this: {0}".
format(m.groups()))
860 if self.
_type is None:
861 dtype = m.group(2).
split(
'_', 1)[0]
863 if re.match(
r'D(RAW|ESD|AOD)', dtype):
865 msg.debug(
"Autoset data type to {0}".
format(dtype))
868 msg.debug(
"Autoset data subtype to {0}".
format(m.group(2)))
871 msg.debug(
"ArgFile name setter did not match against '{0}'".
format(value))
893 events = self.
getSingleMetadata(fname=fname, metadataKey=
'nentries', populate =
not fast)
895 msg.debug(
'Got events=None for file {0} - returning None for this instance'.
format(fname))
897 if events ==
'UNDEFINED':
898 msg.debug(
'Got events=UNDEFINED for file {0} - returning UNDEFINED for this instance'.
format(fname))
900 if not isinstance(events, int):
901 msg.warning(
'Got unexpected events metadata for file {0}: {1!s} - returning None for this instance'.
format(fname, events))
903 totalEvents += events
914 if files == []
or '_fileMetadata' not in dir(self):
916 for fname
in self.
value:
920 if fname
in self.
value:
927 if self.
_guid is not None:
928 msg.debug(
'Now trying to set file GUID metadata using {0}'.
format(self.
_guid))
933 msg.warning(
'Explicit GUID {0} was passed for file {1}, but this file is not a member of this instance'.
format(guid, fname))
941 def getMetadata(self, files = None, metadataKeys = None, maskMetadataKeys = None, populate = True, flush = False):
945 elif isinstance(files, str):
947 msg.debug(
'getMetadata will examine these files: {0!s}'.
format(files))
949 if metadataKeys
is None:
951 elif isinstance(metadataKeys, str):
952 metadataKeys = [metadataKeys,]
953 if maskMetadataKeys
is not None:
954 metadataKeys = [k
for k
in metadataKeys
if k
not in maskMetadataKeys]
955 msg.debug(
'getMetadata will retrieve these keys: {0!s}'.
format(metadataKeys))
958 msg.debug(
'Flushing cached metadata values')
962 msg.debug(
'Checking metadata values')
968 for mdkey
in metadataKeys:
974 msg.error(
'Did not find metadata key {0!s} for file {1!s} - setting to None'.
format(mdkey, fname))
975 metadata[fname][mdkey] =
None
986 if not (isinstance(fname, str)
and isinstance(metadataKey, str)):
988 'Illegal call to getSingleMetadata function: {0!s} {1!s}'.
format(fname, metadataKey))
989 md = self.
getMetadata(files = fname, metadataKeys = metadataKey, populate = populate, flush = flush)
990 return md[fname][metadataKey]
997 msg.debug(
'Retrieving metadata keys {1!s} for files {0!s}'.
format(files, metadataKeys))
1008 for key
in metadataKeys:
1009 if key !=
'_exists':
1013 for key
in metadataKeys:
1015 msg.debug(
'Metadata key {0} is unknown for {1}'.
format(key, self.__class__.__name__))
1019 msg.debug(
'Found cached value for {0}:{1} = {2!s}'.
format(fname, key, self.
_fileMetadata[fname][key]))
1021 msg.debug(
'No cached value for {0}:{1}. Calling generator function {2} ({3})'.
format(fname, key, self.
_metadataKeys[key].__name__, self.
_metadataKeys[key]))
1024 msg.info(
"Metadata generator called to obtain {0} for {1}".
format(key, files))
1027 msg.error(
'Calling {0!s} raised an exception: {1!s}'.
format(self.
_metadataKeys[key].__name__, e))
1029 msg.warning(
'Call to function {0} for {1} file {2} failed to populate metadata key {3}'.
format(self.
_metadataKeys[key].__name__, self.__class__.__name__, fname, key))
1050 for k, v
in metadataKeys.items():
1051 msg.debug(
'Manualy setting {0} for file {1} to {2}'.
format(k, fname, v))
1062 msg.debug(
'Testing for cached values for files {0} and keys {1}'.
format(files, metadataKeys))
1065 elif isinstance(files, str):
1067 if metadataKeys
is None:
1069 elif isinstance(metadataKeys, str):
1070 metadataKeys = (metadataKeys,)
1074 for key
in metadataKeys:
1076 isCachedFlag =
False
1078 if isCachedFlag
is False:
1093 for filename
in self.
_value:
1094 if filename.find(
'#') > -1:
1095 (dataset, fname) = filename.split(
'#', 1)
1096 newValue.append(fname)
1097 msg.debug(
'Current dataset: {0}; New dataset {1}'.
format(self.
_dataset, dataset))
1100 'Found inconsistent dataset assignment in argFile setup: %s != %s' % (self.
_dataset, dataset))
1102 if len(newValue) == 0:
1104 elif len(newValue) != len (self.
_value):
1106 'Found partial dataset assignment in argFile setup from {0} (dsn#lfn notation must be uniform for all inputs)'.
format(self.
_value))
1118 except OSError
as e:
1119 msg.error(
'Got exception {0!s} raised while stating file {1}'.
format(e, fname))
1123 msg.debug(
'Calling ROOT TFile.GetSize({0})'.
format(fname))
1134 with open(fname)
as f:
1137 chunk = len(f.read(1024*1024))
1138 msg.debug(
'Read {0} bytes from {1}'.
format(chunk, fname))
1142 except OSError
as e:
1143 msg.error(
'Got exception {0!s} raised while checking integrity of file {1}'.
format(e, fname))
1145 except UnicodeDecodeError:
1146 msg.debug(
'Problem reading file as unicode, attempting with binary')
1149 with open(fname,
'rb')
as f:
1152 chunk = len(f.read(1024*1024))
1153 msg.debug(
'Read {0} bytes from {1}'.
format(chunk, fname))
1157 except OSError
as e:
1158 msg.error(
'Got exception {0!s} raised while checking integrity of file {1}'.
format(e, fname))
1167 msg.debug(
'Generating a GUID for file {0}'.
format(fname))
1177 msg.debug(
'Testing existance for {0}'.
format(files))
1181 size = os.stat(fname).st_size
1184 msg.debug(
'POSIX file {0} exists'.
format(fname))
1185 except OSError
as e:
1186 msg.error(
'Got exception {0!s} raised while stating file {1} - probably it does not exist'.
format(e, fname))
1190 msg.debug(
'Calling ROOT TFile.GetSize({0})'.
format(fname))
1194 msg.error(
'Non-POSIX file {0} could not be opened - probably it does not exist'.
format(fname))
1196 msg.debug(
'Non-POSIX file {0} exists'.
format(fname))
1212 for arg
in copyArgs:
1214 myargdict[arg] = copy.copy(argdict[arg])
1217 myargdict = copy.copy(argdict)
1219 myargdict[
'checkEventCount'] =
argSubstepBool(
'False', runarg=
False)
1221 if 'athenaopts' in myargdict:
1224 for subStep
in myargdict[
'athenaopts'].value:
1227 for opt
in myargdict[
'athenaopts'].value[subStep]:
1228 if opt.startswith(
'--nprocs'):
1232 elif opt.startswith(
'--threads'):
1238 if hasNprocs
and hasNthreads:
1241 if opt.startswith(
'--threads'):
1250 def __init__(self, value=list(), io =
'output', type=
None, splitter=
',', runarg=
True, multipleOK=
None, name=
None):
1251 super(argYODAFile, self).
__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1257 'lheSumOfNegWeights': 0,
1261 msg.debug(
'Retrieving event count for LHE file {0}'.
format(files))
1266 tar = tarfile.open(fname,
"r:gz")
1268 for untar
in tar.getmembers():
1269 fileTXT = tar.extractfile(untar)
1270 if fileTXT
is not None :
1271 lines = fileTXT.read().
decode(
"utf-8")
1272 lhecount = lines.count(
'/event')
1276 msg.debug(
'Entries is set to None - event count undefined for this LHE')
1280 msg.debug(
'Retrieving weight count for LHE file {0}'.
format(files))
1288 tar = tarfile.open(fname,
"r:gz")
1289 for untar
in tar.getmembers():
1290 fileTXT = tar.extractfile(untar)
1292 if fileTXT
is not None :
1293 for line
in fileTXT :
1294 line = line.decode(
"utf-8")
1297 w =
float(re.sub(
' +',
' ',line).
split(
" ")[2])
1298 if w > 0 : weightPos += w
1299 else : weightNeg += abs(w)
1303 if "<event" in line :
1309 msg.debug(
'Entries is set to None - negative fraction count undefined for this LHE')
1316 def __init__(self, value = list(), type=
None, subtype=
None, io =
'output', splitter=
',', runarg=
True, multipleOK =
None,
1317 name=
None, executor=
list(), mergeTargetSize=-1, auxiliaryFile=
False):
1318 super(argAthenaFile, self).
__init__(value=value, subtype=subtype, io=io, type=type, splitter=splitter, runarg=runarg,
1319 multipleOK=multipleOK, name=name, executor=executor, mergeTargetSize=mergeTargetSize,
1320 auxiliaryFile=auxiliaryFile)
1323 for key
in athFileInterestingKeys:
1332 msg.debug(
'Will retrieve metadata info for {0!s}'.
format(myFiles))
1340 for fname
in myFiles:
1342 if athFileMetadata
is None:
1344 msg.debug(
'Setting metadata for file {0} to {1}'.
format(fname, athFileMetadata[fname]))
1349 self.
_callAthInfo(files, doAllFiles =
True, retrieveKeys=athFileInterestingKeys)
1353 desc=super(argAthenaFile, self).prodsysDescription
1360 integrityFunction =
"returnIntegrityOfBSFile"
1365 rc=
call([
"AtlListBSEvents",
"-c", fname], logger=msg, message=
"Report by AtlListBSEvents: ", timeout=600)
1375 desc=super(argBSFile, self).prodsysDescription
1386 msg.debug(
'selfMerge attempted for {0} -> {1} with {2} (index {3})'.
format(inputs, output, argdict, counter))
1389 for fname
in inputs:
1390 if fname
not in self.
_value:
1392 "File {0} is not part of this agument: {1}".
format(fname, self))
1398 myargdict[
'maskEmptyInputs'] =
argBool(
True)
1399 myargdict[
'allowRename'] =
argBool(
True)
1400 myargdict[
'emptyStubFile'] =
argString(inputs[0])
1405 myDataDictionary = {
'BS_MRG_INPUT' :
argBSFile(inputs, type=self.
type, io=
'input'),
1406 'BS_MRG_OUTPUT' :
argBSFile(output, type=self.
type, io=
'output')}
1407 myMergeConf = executorConfig(myargdict, myDataDictionary)
1408 myMerger = bsMergeExecutor(name=
'BSMergeAthenaMP{0}{1}'.
format(self.
_subtype, counter), conf=myMergeConf, exe =
'file_merging',
1409 inData=
set([
'BS_MRG_INPUT']), outData=
set([
'BS_MRG_OUTPUT']))
1410 myMerger.doAll(input=
set([
'BS_MRG_INPUT']), output=
set([
'BS_MRG_OUTPUT']))
1414 for fname
in inputs:
1418 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1427 integrityFunction =
"returnIntegrityOfPOOLFile"
1433 from PyJobTransforms.trfValidateRootFile
import checkFile
1434 rc=
checkFile(fileName=fname, the_type=
'event', requireTree=
False)
1442 desc=super(argPOOLFile, self).prodsysDescription
1451 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1454 for fname
in inputs:
1455 if fname
not in self.
_value:
1457 "File {0} is not part of this agument: {1}".
format(fname, self))
1467 myDataDictionary = {
'POOL_MRG_INPUT' :
argPOOLFile(inputs, type=self.
type, io=
'input'),
1469 myMergeConf = executorConfig(myargdict, myDataDictionary)
1470 myMerger = athenaExecutor(name=
'POOLMergeAthenaMP{0}{1}'.
format(self.
_subtype, counter), conf=myMergeConf,
1471 skeletonCA =
'RecJobTransforms.MergePool_Skeleton',
1472 inData=
set([
'POOL_MRG_INPUT']), outData=
set([
'POOL_MRG_OUTPUT']),
1473 disableMT=
True, disableMP=
True)
1474 myMerger.doAll(input=
set([
'POOL_MRG_INPUT']), output=
set([
'POOL_MRG_OUTPUT']))
1478 for fname
in inputs:
1482 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1488 integrityFunction =
"returnIntegrityOfPOOLFile"
1492 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1495 for fname
in inputs:
1496 if fname
not in self.
_value:
1498 "File {0} is not part of this agument: {1}".
format(fname, self))
1501 mySubstepName =
'HITSMergeAthenaMP{0}'.
format(counter)
1505 myDataDictionary = {
'HITS' :
argHITSFile(inputs, type=self.
type, io=
'input'),
1507 myMergeConf = executorConfig(myargdict, myDataDictionary)
1508 myMerger = athenaExecutor(name = mySubstepName,
1509 skeletonCA =
'SimuJobTransforms.HITSMerge_Skeleton',
1511 inData=
set([
'HITS']), outData=
set([
'HITS_MRG']),
1512 disableMT=
True, disableMP=
True)
1513 myMerger.doAll(input=
set([
'HITS']), output=
set([
'HITS_MRG']))
1517 for fname
in inputs:
1521 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1528 integrityFunction =
"returnIntegrityOfPOOLFile"
1532 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1535 for fname
in inputs:
1536 if fname
not in self.
_value:
1538 "File {0} is not part of this agument: {1}".
format(fname, self))
1541 mySubstepName =
'EVNT_TRMergeAthenaMP{0}'.
format(counter)
1547 myMergeConf = executorConfig(myargdict, myDataDictionary)
1548 myMerger = athenaExecutor(name = mySubstepName, skeletonFile =
'SimuJobTransforms/skeleton.EVNT_TRMerge.py',
1550 inData=
set([
'EVNT_TR']), outData=
set([
'EVNT_TR_MRG']),
1551 disableMT=
True, disableMP=
True)
1552 myMerger.doAll(input=
set([
'EVNT_TR']), output=
set([
'EVNT_TR_MRG']))
1556 for fname
in inputs:
1560 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1567 integrityFunction =
"returnIntegrityOfPOOLFile"
1571 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1574 for fname
in inputs:
1575 if fname
not in self.
_value:
1577 "File {0} is not part of this agument: {1}".
format(fname, self))
1583 myDataDictionary = {
'RDO' :
argHITSFile(inputs, type=self.
type, io=
'input'),
1585 myMergeConf = executorConfig(myargdict, myDataDictionary)
1586 myMerger = athenaExecutor(name =
'RDOMergeAthenaMP{0}'.
format(counter),
1587 skeletonCA =
'SimuJobTransforms.RDOMerge_Skeleton',
1589 inData=
set([
'RDO']), outData=
set([
'RDO_MRG']),
1590 disableMT=
True, disableMP=
True)
1591 myMerger.doAll(input=
set([
'RDO']), output=
set([
'RDO_MRG']))
1595 for fname
in inputs:
1599 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1605 integrityFunction =
"returnIntegrityOfPOOLFile"
1609 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1612 for fname
in inputs:
1613 if fname
not in self.
_value:
1615 "File {0} is not part of this agument: {1}".
format(fname, self))
1618 mySubstepName =
'EVNTMergeAthenaMP{0}'.
format(counter)
1622 myDataDictionary = {
'EVNT' :
argEVNTFile(inputs, type=self.
type, io=
'input'),
1624 myMergeConf = executorConfig(myargdict, myDataDictionary)
1625 myMerger = athenaExecutor(name = mySubstepName, skeletonCA =
'EvgenJobTransforms.EVNTMerge_Skeleton',
1627 inData=
set([
'EVNT']), outData=
set([
'EVNT_MRG']),
1628 disableMT=
True, disableMP=
True)
1629 myMerger.doAll(input=
set([
'EVNT']), output=
set([
'EVNT_MRG']))
1633 for fname
in inputs:
1637 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1645 integrityFunction =
"returnIntegrityOfHISTFile"
1647 def __init__(self, value=list(), io =
'output', type=
None, subtype=
None, splitter=
',', runarg=
True, countable=
True, multipleOK =
None,
1648 name=
None, auxiliaryFile=
False):
1649 super(argHISTFile, self).
__init__(value=value, io=io, type=type, subtype=subtype, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1650 name=name, auxiliaryFile=auxiliaryFile)
1669 msg.error(
'Timeout counting events for {0}'.
format(fname))
1673 desc=super(argHISTFile, self).prodsysDescription
1681 integrityFunction =
"returnIntegrityOfNTUPFile"
1686 def __init__(self, value=list(), io =
'output', type=
None, subtype=
None, splitter=
',', treeNames=
None, runarg=
True, multipleOK =
None,
1687 name=
None, mergeTargetSize=-1, auxiliaryFile=
False):
1688 super(argNTUPFile, self).
__init__(value=value, io=io, type=type, subtype=subtype, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1689 name=name, mergeTargetSize=mergeTargetSize, auxiliaryFile=auxiliaryFile)
1698 if name
and 'NTUP_PILEUP' in name:
1704 msg.debug(
'Retrieving event count for NTUP files {0}'.
format(files))
1709 if myPRWEntries
is not None:
1711 if self.
name and 'NTUP_PILEUP' in self.
name:
1712 myPRWEntries =
PRWEntries(fileName=fname, integral=
True)
1717 if myPHYSVALEntries
is not None:
1719 if self.
name and 'NTUP_PHYSVAL' in self.
name:
1721 self.
_fileMetadata[fname][
'sumOfWeights'] = myPHYSVALEntries
1723 msg.debug(
'treeNames is set to None - event count undefined for this NTUP')
1730 msg.error(
'Timeout counting events for {0}'.
format(fname))
1735 from PyJobTransforms.trfValidateRootFile
import checkFile
1736 rc=
checkFile(fileName=fname, the_type=
'basket', requireTree=
False)
1744 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1747 for fname
in inputs:
1748 if fname
not in self.
_value:
1750 "File {0} is not part of this agument: {1}".
format(fname, self))
1758 myDataDictionary = {
'NTUP_MRG_INPUT' :
argNTUPFile(inputs, type=self.
type, io=
'input'),
1760 myMergeConf = executorConfig(myargdict, myDataDictionary)
1761 myMerger = NTUPMergeExecutor(name=
'NTUPMergeAthenaMP{0}{1}'.
format(self.
_subtype, counter), conf=myMergeConf,
1762 inData=
set([
'NTUP_MRG_INPUT']), outData=
set([
'NTUP_MRG_OUTPUT']))
1763 myMerger.doAll(input=
set([
'NTUP_MRG_INPUT']), output=
set([
'NYUP_MRG_OUTPUT']))
1767 for fname
in inputs:
1771 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1777 desc=super(argNTUPFile, self).prodsysDescription
1788 f = bz2.BZ2File(fname,
'r')
1790 chunk = len(f.read(1024*1024))
1791 msg.debug(
'Read {0} bytes from {1}'.
format(chunk, fname))
1796 except OSError
as e:
1797 msg.error(
'Got exception {0!s} raised while checking integrity of file {1}'.
format(e, fname))
1803 desc=super(argBZ2File, self).prodsysDescription
1809 def __init__(self, value=list(), io =
'output', type=
None, splitter=
',', runarg=
True, multipleOK=
None, name=
None):
1810 super(argFTKIPFile, self).
__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1820 f = bz2.BZ2File(fname,
'r')
1822 if line.startswith(
'F'):
1825 except OSError
as e:
1826 msg.error(
'Event count for file {0} failed: {1!s}'.
format(fname, e))
1831 desc=super(argFTKIPFile, self).prodsysDescription
1837 def __init__(self, value=list(), io =
'output', type=
'txt_evt', splitter=
',', runarg=
True, multipleOK=
None, name=
None):
1838 super(argHepEvtAsciiFile, self).
__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg,
1839 multipleOK=multipleOK, name=name)
1849 tar = tarfile.open(fname,
"r:gz")
1850 for untar
in tar.getmembers():
1851 fileTXT = tar.extractfile(untar)
1852 if fileTXT
is not None :
1853 lines = fileTXT.read().
decode(
"utf-8")
1855 eventCount = lines.count(
'E ')
1857 except OSError
as e:
1858 msg.error(
'Event count for file {0} failed: {1!s}'.
format(fname, e))
1863 def __init__(self, value=list(), io =
'output', type=
None, splitter=
',', runarg=
True, multipleOK=
None, name=
None):
1864 super(argLHEFile, self).
__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1870 'lheSumOfNegWeights': 0,
1874 msg.debug(
'Retrieving event count for LHE file {0}'.
format(files))
1879 tar = tarfile.open(fname,
"r:gz")
1881 for untar
in tar.getmembers():
1882 fileTXT = tar.extractfile(untar)
1883 if fileTXT
is not None :
1884 lines = fileTXT.read().
decode(
"utf-8")
1885 lhecount = lines.count(
'/event')
1889 msg.debug(
'Entries is set to None - event count undefined for this LHE')
1893 msg.debug(
'Retrieving weight count for LHE file {0}'.
format(files))
1901 tar = tarfile.open(fname,
"r:gz")
1902 for untar
in tar.getmembers():
1903 fileTXT = tar.extractfile(untar)
1905 if fileTXT
is not None :
1906 lines = fileTXT.readlines()
1910 w =
float(re.sub(
' +',
' ',line).
split(
" ")[2])
1911 if w > 0 : weightPos += w
1912 else : weightNeg += abs(w)
1916 if "<event" in line :
1922 msg.debug(
'Entries is set to None - negative fraction count undefined for this LHE')
1923 self.
_fileMetadata[fname][
'lheSumOfPosWeights'] =
'UNDEFINED'
1924 self.
_fileMetadata[fname][
'lheSumOfNegWeights'] =
'UNDEFINED'
1935 def __init__(self, value = None, runarg = True, name = None, defaultSubstep = 'all', separator = ':
'):
1938 super(argSubstep, self).__init__(value, runarg, name)
1948 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
1951 elif isinstance(value, str):
1953 elif isinstance(value, (list, tuple)):
1957 if not isinstance(item, str):
1960 elif isinstance(value, dict):
1972 subStepMatch = re.match(
r'([a-zA-Z0-9,]+)' + self.
_separator +
r'(.*)', string)
1975 subStep = subStepMatch.group(1).
split(
',')
1976 subStepValue = subStepMatch.group(2)
1979 subStepValue = string
1980 msg.debug(
'Parsed {0} as substep {1}, argument {2}'.
format(string, subStep, subStepValue))
1981 for step
in subStep:
1982 subStepList.append((step, subStepValue))
1994 substep = exe.substep
1995 first = exe.conf.firstExecutor
2002 value = self.
_value[name]
2003 elif substep
in self.
_value:
2004 value = self.
_value[substep]
2005 elif first
and 'first' in self.
_value:
2006 value = self.
_value[
'first']
2007 elif 'default' in self.
_value:
2008 value = self.
_value[
'default']
2020 value = self.
_value[
'all']
2021 elif isinstance(value, list):
2022 value = self.
_value[
'all'] + value
2024 msg.debug(
'From substep argument {myvalue} picked value "{value}" for {name}, {substep}, first={first}'.
format(myvalue=self.
_value, value=value, name=name, substep=substep, first=first))
2030 desc = {
'type':
'substep',
'substeptype':
'str',
'separator': self.
_separator,
2045 def __init__(self, value = None, runarg = True, name = None, defaultSubstep = 'all', splitter = None, separator=':
'):
2047 super(argSubstepList, self).__init__(value, runarg, name, defaultSubstep, separator)
2057 desc = {
'type':
'substep',
'substeptype':
'list',
'listtype':
'str',
2063 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
2066 elif isinstance(value, str):
2068 elif isinstance(value, (list, tuple)):
2072 if not isinstance(item, str):
2075 for subStep
in subStepList:
2076 if subStep[0]
in self.
_value:
2079 self.
_value[subStep[0]] = subStep[1]
2080 elif isinstance(value, dict):
2081 for k, v
in value.items():
2082 if not isinstance(k, str):
2084 if not isinstance(v, list):
2095 subStepList = [(s[0], s[1].
split(self.
_splitter))
for s
in subStepList]
2097 subStepList = [(s[0], [s[1]])
for s
in subStepList]
2110 desc = {
'type':
'substep',
'substeptype':
'str',
'separator': self.
_separator,
2116 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
2119 elif isinstance(value, str):
2121 self.
_value = dict([(subStep[0], subStep[1])
for subStep
in subStepList])
2122 elif isinstance(value, (list, tuple)):
2126 if not isinstance(item, str):
2129 for subStep
in subStepList:
2130 self.
_value[subStep[0]] = subStep[1]
2131 elif isinstance(value, dict):
2132 for k, v
in value.items():
2133 if not isinstance(k, str):
2135 if not isinstance(v, str):
2151 desc = {
'type':
'substep',
'substeptype':
'bool',
'separator': self.
_separator,
2157 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1})'.
format(value,
type(value)))
2160 elif isinstance(value, bool):
2162 elif isinstance(value, str):
2164 self.
_value = dict([(subStep[0],
strToBool(subStep[1]))
for subStep
in subStepList])
2165 elif isinstance(value, (list, tuple)):
2169 if not isinstance(item, str):
2172 for subStep
in subStepList:
2174 elif isinstance(value, dict):
2175 for k, v
in value.items():
2176 if not isinstance(k, str):
2178 if not isinstance(v, bool):
2195 desc = {
'type':
'substep',
'substeptype':
'int',
'separator': self.
_separator,
2201 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
2205 elif isinstance(value, int):
2207 elif isinstance(value, str):
2209 self.
_value = dict([(subStep[0],
int(subStep[1]))
for subStep
in subStepList])
2210 elif isinstance(value, (list, tuple)):
2214 if not isinstance(item, str):
2217 for subStep
in subStepList:
2218 self.
_value[subStep[0]] =
int(subStep[1])
2219 elif isinstance(value, dict):
2220 for k, v
in value.items():
2221 if not isinstance(k, str):
2223 if not isinstance(v, int):
2236 def __init__(self, value=None, min=None, max=None, runarg=True, name=None):
2239 super(argSubstepFloat, self).
__init__(value = value, runarg = runarg, name=name)
2243 desc = {
'type':
'substep',
'substeptype':
'float',
'separator': self.
_separator,
2246 desc[
'min'] = self.
_min
2248 desc[
'max'] = self.
_max
2259 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
2263 elif isinstance(value, float):
2265 elif isinstance(value, str):
2267 self.
_value = dict([(subStep[0],
float(subStep[1]))
for subStep
in subStepList])
2268 elif isinstance(value, (list, tuple)):
2272 if not isinstance(item, str):
2274 'Failed to convert list item {0!s} to substep (should be a string)'.
format(item))
2276 for subStep
in subStepList:
2278 elif isinstance(value, dict):
2279 for k, v
in value.items():
2280 if not isinstance(k, str):
2282 'Dictionary key {0!s} for substep is not a string'.
format(k))
2283 if not isinstance(v, float):
2285 'Dictionary value {0!s} for substep is not an float'.
format(v))
2289 'Setter value {0!s} (type {1}) for substep argument cannot be parsed'.
format(value,
type(value)))
2292 if (self.
_min is not None and my_float < self.
_min)
or (self.
_max is not None and my_float > self.
_max):
2294 'argFloat value out of range: {0} is not between {1} and {2}'.
format(my_float, self.
_min, self.
_max))
2295 except ValueError
as e:
2297 'Failed to convert substep value {0} to float: {1}'.
format(value, e))
2313 'doRDO_TRIG': {
'RAWtoALL': [(
'in',
'-',
'RDO'), (
'in',
'+',
'RDO_TRIG'), (
'in',
'-',
'BS')]},
2314 'doOverlay': {
'HITtoRDO': [(
'in',
'-',
'HITS'), (
'out',
'-',
'RDO'), (
'out',
'-',
'RDO_FILT')],
2315 'Overlay': [(
'in',
'+', (
'HITS',
'RDO_BKG')), (
'out',
'+',
'RDO')]},
2316 'doFCtoDAOD': {
'Derivation': [(
'in',
'-',
'EVNT')]},
2317 'afterburn': {
'generate': [(
'out',
'-',
'EVNT')]},
2335 desc = {
'type':
'substep',
'substeptype':
'steering',
'listtype':
'str',
'separator': self.
_separator,
2344 msg.debug(
'Attempting to set argSubstepSteering from {0!s} (type {1})'.
format(value,
type(value)))
2348 elif isinstance(value, dict):
2350 for k, v
in value.items():
2351 if not isinstance(k, str)
or not isinstance(v, list):
2353 'Failed to convert dict {0!s} to argSubstepSteering'.
format(value))
2355 if not isinstance(subv, (list, tuple))
or len(subv) != 3
or subv[0]
not in (
'in',
'out')
or subv[1]
not in (
'+',
'-'):
2357 'Failed to convert dict {0!s} to argSubstepSteering'.
format(value))
2363 self.
_dumpvalue = getattr(self,
"_dumpvalue", value)
2364 elif isinstance(value, (str, list, tuple)):
2365 if isinstance(value, str):
2367 self.
_dumpvalue = getattr(self,
"_dumpvalue", value)
2371 if not isinstance(item, str):
2373 'Failed to convert list item {0!s} to substep (should be a string)'.
format(item))
2374 if item
in argSubstepSteering.steeringAlises:
2375 msg.debug(
"Found value {0} in steeringAlises ({1})".
format(item, argSubstepSteering.steeringAlises[item]))
2376 for substep, steerlist
in argSubstepSteering.steeringAlises[item].
items():
2377 if substep
in self.
_value:
2380 self.
_value[substep] = steerlist
2386 'Setter value {0!s} (type {1}) for substep argument cannot be parsed'.
format(value,
type(value)))
2389 if string
in argSubstepSteering.steeringAlises:
2390 return argSubstepSteering.steeringAlises[string]
2394 for subvalue
in ivalue.split(
','):
2395 matchedParts = re.match(
r'(in|out)(\+|\-)([A-Z_]+)$', subvalue)
2396 if not matchedParts:
2398 'Failed to convert string {0!s} to argSubstepSteering'.
format(subvalue))
2399 retvalue.append((matchedParts.group(1), matchedParts.group(2), matchedParts.group(3)))
2411 msg.debug(
'Attempting to set argSubstepConditions from {0!s} (type {1}'.
format(value,
type(value)))
2413 super(self.__class__, self.__class__).value.fset(self, value)
2417 if "CurrentMC" == v:
2423 cmd =
"COMAGetGlobalTagNameByCurrentState --state=CurrentMC"
2424 return str(client.execute(cmd, format =
'dom_object').get_rows().pop()[
'globalTag'])
2428 desc = {
'type':
'substep',
'substeptype':
'str',
'separator': self.
_separator,
2446 super(trfArgParser, self).
__init__(*args, **kwargs)
2449 argName = args[0].lstrip(
'-')
2450 msg.debug(
'Found arg name {0}'.
format(argName))
2456 'Transform arguments may not use hyphens (use camelCase or underscore')
2461 msg.debug(
'Double definition of argument {0} - ignored'.
format(argName))
2465 if 'help' in kwargs:
2469 if 'action' in kwargs
and 'factory' in dir(kwargs[
'action']):
2470 self.
_argClass[argName] = kwargs[
'action'].factory
2471 elif 'type' in kwargs:
2472 self.
_argClass[argName] = kwargs[
'type']
2478 for arg
in (
'group',):
2480 strippedArgs[arg] = kwargs.pop(arg)
2484 for i
in range(1, len(args)):
2485 argAlias = args[i].lstrip(
'-')
2486 msg.debug(
'Adding an alias of {0}: {1}'.
format(argName, argAlias))
2490 if 'group' in strippedArgs:
2492 msg.debug(
'Adding argument to group {0}: ({1}; {2})'.
format(strippedArgs[
'group'], args, kwargs))
2496 msg.warning(
'Argument group {0} not defined - adding argument to main parser'.
format(strippedArgs[
'group']))
2497 msg.debug(
'Adding argument: ({0}; {1})'.
format(args, kwargs))
2498 super(trfArgParser, self).
add_argument(*args, **kwargs)
2500 msg.debug(
'Adding argument: ({0}; {1})'.
format(args, kwargs))
2501 super(trfArgParser, self).
add_argument(*args, **kwargs)
2507 msg.debug(
'Detected the local variable {0}'.
format(name))
2508 if argClass
is not None:
2509 desc[name] = argClass().prodsysDescription
2511 desc[name].update({
'help': self.
_helpString[name]})
2520 msg.warning(
'Argument group %s already exists', args[0])
2522 self.
_argGroups[args[0]] = self.add_argument_group(*args)
2535 keyArray = [
'--' +
str(key)
for key
in self.
_helpString if key
not in (
'h',
'verbose',
'loglevel',
'dumpargs',
'argdict') ]
2537 print(
'ListOfDefaultPositionalKeys={0}'.
format(keyArray))
2549 newValueObj = value[0]
2550 msg.debug(
'Started with: %s = %s',
type(newValueObj), newValueObj)
2551 if isinstance(value[0], argSubstep):
2554 elif isinstance(value[0], list):
2557 msg.debug(
'Handling a list of arguments for key')
2561 processedValueObj.value = processedValues
2562 newValues.append(processedValueObj)
2563 newValueObj = newValues
2564 return newValueObj, newValues
2565 elif isinstance(value[0].value, list):
2566 newValues = value[0].value
2567 elif isinstance(value[0].value, dict):
2568 newValues = value[0].value
2570 newValues = [value[0].value,]
2571 for valueObj
in value[1:]:
2572 msg.debug(
'Value Object: %s = %s',
type(valueObj), valueObj)
2573 if isinstance(value[0], argSubstep):
2576 elif isinstance(valueObj.value, list):
2578 newValues.extend(valueObj.value)
2579 elif isinstance(valueObj.value, dict):
2581 newValues.update(valueObj.value)
2583 newValues.append(valueObj.value)
2584 return newValueObj, newValues
2593 super(trfArgParser, self).
parse_args(args = args, namespace = namespace)
2595 namespace = super(trfArgParser, self).
parse_args(args = args)
2596 for k, v
in namespace.__dict__.items():
2597 msg.debug(
'Treating key %s (%s)', k, v)
2598 if isinstance(v, list):
2600 if not isinstance(newValueObj, list):
2601 newValueObj.value = newValues
2602 namespace.__dict__[k] = newValueObj
2603 msg.debug(
'Set to %s', newValues)
2611 msg.debug(
"converting string {string} to boolean".
format(string = string))
2612 if string.lower() ==
'false':
2614 elif string.lower() ==
'true':
2618 except AttributeError:
2632 allKeys =
set(dict1) |
set(dict2)
2636 if isinstance(
list(dict1.values())[0], list):
2638 elif len(dict2) > 0:
2639 if isinstance(
list(dict2.values())[0], list):
2643 mergeDict[key] = dict1.get(key, []) + dict2.get(key, [])
2646 if key
in dict1
and key
in dict2:
2648 if dict1[key] != dict2[key]:
2650 'Merging substep arguments found clashing values for substep {0}: {1}!={2}'.
format(key, dict1[key], dict2[key]))
2651 mergeDict[key] = dict1[key]
2653 mergeDict[key] = dict1[key]
2655 mergeDict[key] = dict2[key]