17 msg = logging.getLogger(__name__)
19 import PyJobTransforms.trfExceptions
as trfExceptions
21 from PyJobTransforms.trfFileUtils import athFileInterestingKeys, AthenaLiteFileInfo, NTUPEntries, HISTEntries, PRWEntries, urlType, ROOTGetSize
23 from PyJobTransforms.trfExeStepTools
import commonExecutorStepName
25 from PyJobTransforms.trfDecorators
import timelimited
33 msg.debug(
'Initialised class %s with args=%s; kwargs=%s', genclass, args, kwargs)
39 msg.debug(
'Called class %s with value=%s; args=%s; kwargs=%s', self.
_genclass, valueString, self.
_args, self.
_kwargs)
47 if valueString
is None:
51 except Exception
as e:
52 msg.fatal(
'Got this exception raised when calling object factory: {0}'.
format(e))
62 def __init__(self, factory, option_strings, dest, **kwargs):
64 super().
__init__(option_strings, dest, **kwargs)
66 def __call__(self, parser, namespace, values, option_string=None):
67 msg.debug(
'Called action for factory=%s; values=%s', self.
_factory, values)
70 if isinstance(values, list):
73 setattr(namespace, self.dest, [self.
_factory(
None)])
75 setattr(namespace, self.dest, [self.
_factory(v)
for v
in values])
77 setattr(namespace, self.dest, self.
_factory(values))
85 msg.debug(
'Initialised action class %s with args=%s; kwargs=%s', genclass, args, kwargs)
92 def __call__(self, option_strings, dest, **kwargs):
109 def __init__(self, value = None, runarg = True, name = None):
149 desc = {
'type' :
None}
154 return '{0}: Value {1} (isRunArg={2})'.
format(self.__class__.__name__, self.
_value, self.
_runarg)
162 return self.
value == other.value
165 return self.
value != other.value
168 return self.
value < other.value
171 return self.
value > other.value
182 def __init__(self, value = None, runarg = True, name = None, choices = None):
184 super(argString, self).
__init__(value = value, runarg = runarg, name=name)
214 desc = {
'type' :
'str'}
242 if isinstance(value, int):
248 except ValueError
as e:
250 'Failed to convert value {0} to int: {1}'.
format(value, e))
255 desc = {
'type' :
'int'}
267 def __init__(self, value=None, min=None, max=None, runarg=True, name=None):
270 super(argFloat, self).
__init__(value = value, runarg = runarg, name=name)
280 desc = {
'type' :
'float'}
282 desc[
'min'] = self.
_min
284 desc[
'max'] = self.
_max
296 if self.
_min is not None:
302 if isinstance(value, float):
308 'Failed to convert %s to a float' %
str(value))
312 'argFloat value out of range: %g is not between %s and %s' %
336 if isinstance(value, bool):
344 desc = {
'type' :
'bool'}
356 def __init__(self, value = [], supressEmptyStrings = True, splitter=',', runarg=True, name=None):
360 super(argList, self).
__init__(value = value, runarg = runarg, name=name)
371 desc = {
'type' :
'list',
'listtype':
'str'}
381 if isinstance(value, (list, tuple)):
392 except AttributeError:
394 'Failed to convert %s to a list' %
str(value))
428 if isinstance(value, list):
430 if not isinstance(v, int):
432 'Illegal value {0} in list of ints'.
format(v))
444 except (AttributeError, ValueError):
446 'Failed to convert %s to a list of ints' %
str(value))
453 desc = {
'type' :
'list',
'listtype' :
'int'}
465 def __init__(self, value = {}, supressEmptyStrings = True, splitter=',', kvsplitter=":
", runarg=True, name=None):
470 super(argList, self).__init__(value = value, runarg = runarg, name=name)
485 if isinstance(value, dict):
486 for k, v
in value.items():
487 if not isinstance(k, str):
489 'Illegal key argument type {0} in dictionary for argKeyFloatValueList'.
format(k))
490 if not isinstance(v, float):
492 'Illegal value argument type {0} in dictionary for argKeyFloatValueList'.
format(v))
501 kvlist = [ v
for v
in value.split(self.
_splitter)
if v !=
'' ]
507 except (AttributeError, ValueError):
509 'Failed to convert {0} to a dictionary of string:int values'.
format(value))
516 desc = {
'type' :
'list',
'listtype' :
'str:float'}
544 def __init__(self, value=list(), type=
None, subtype=
None, io =
'output', splitter=
',', runarg=
True, guid=
None,
545 multipleOK =
None, name=
None, executor=
list(), mergeTargetSize=-1, auxiliaryFile=
False):
576 if multipleOK
is None:
585 super(argFile, self).
__init__(value=value, splitter=splitter, runarg=runarg, name=name)
617 @mergeTargetSize.setter
626 if isinstance(self.
_type, dict):
628 desc = {
'type' :
'file',
'subtype' :
"NONE" }
632 desc = {
'type' :
'file',
'subtype' :
str(self.
_type).
upper()}
647 if isinstance(value, (list, tuple)):
648 if len(value) > 0
and isinstance(value[0], dict):
656 'Filename (key "lfn") not found in Tier-0 file dictionary: {0}'.
format(myfile))
657 for k, v
in myfile.items():
662 elif k ==
'checksum':
669 'Inconsistent dataset names in Tier-0 dictionary: {0} != {1}'.
format(self.
dataset, v))
679 if value.lower().startswith(
'lfn'):
681 from PyUtils.PoolFile
import file_name
690 except (AttributeError, TypeError):
692 'Failed to convert %s to a list' %
str(value))
695 deDuplicatedValue = []
697 if fname
not in deDuplicatedValue:
698 deDuplicatedValue.append(fname)
700 msg.warning(
"Removing duplicated file {0} from file list".
format(fname))
701 if len(self.
_value) != len(deDuplicatedValue):
702 self.
_value = deDuplicatedValue
703 msg.warning(
'File list after duplicate removal: {0}'.
format(self.
_value))
714 if self.
_io ==
'input':
720 msg.debug(
'Found POSIX filesystem input - activating globbing')
722 for filename
in self.
_value:
724 globbedFiles = glob.glob(filename)
725 if len(globbedFiles) == 0:
727 'Input file argument {0} globbed to NO input files - probably the file(s) are missing'.
format(filename))
730 newValue.extend(globbedFiles)
733 msg.debug (
'File input is globbed to %s' % self.
_value)
736 msg.debug(
'Found root filesystem input - activating globbing')
738 for filename
in self.
_value:
739 if str(filename).startswith(
"root"):
740 msg.debug(
'Found input file name starting with "root," setting XRD_RUNFORKHANDLER=1, which enables fork handlers for xrootd in direct I/O')
741 os.environ[
"XRD_RUNFORKHANDLER"] =
"1"
742 if str(filename).startswith(
"https")
or str(filename).startswith(
"davs")
or not(
str(filename).endswith(
'/'))
and '*' not in filename
and '?' not in filename:
743 msg.debug(
'Seems that only one file was given: {0}'.
format(filename))
744 newValue.extend(([filename]))
749 if '*' in filename
or '?' in filename:
750 msg.debug(
'Split input into path for listdir() and a filemask to select available files.')
751 path = filename[0:filename.rfind(
'/')+1]
752 msg.debug(
'path: {0}'.
format(path))
753 fileMask = filename[filename.rfind(
'/')+1:len(filename)]
754 msg.debug(
'Will select according to: {0}'.
format(fileMask))
756 cmd = [
'/afs/cern.ch/project/eos/installation/atlas/bin/eos.select' ]
757 if not os.access (
'/afs/cern.ch/project/eos/installation/atlas/bin/eos.select', os.X_OK ):
759 'No execute access to "eos.select" - could not glob EOS input files.')
766 proc = subprocess.Popen(args = cmd,bufsize = 1, shell =
False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
768 output = proc.stdout.readlines()
771 'EOS list command ("{0!s}") failed: rc {1}, output {2}'.
format(cmd, rc, output))
772 msg.debug(
"eos returned: {0}".
format(output))
775 myFiles += [
str(path)+
str(line.rstrip(
'\n'))]
777 patt = re.compile(fileMask.replace(
'*',
'.*').
replace(
'?',
'.'))
778 for srmFile
in myFiles:
780 if(patt.search(srmFile))
is not None:
782 msg.debug(
'match: ',srmFile)
783 newValue.extend(([srmFile]))
785 newValue.extend(([srmFile]))
787 msg.debug(
'Selected files: ', newValue)
788 except (AttributeError, TypeError, OSError):
790 'Failed to convert %s to a list' %
str(value))
791 if len(self.
_value) > 0
and len(newValue) == 0:
794 'Input file argument(s) {0!s} globbed to NO input files - ls command failed')
796 msg.debug (
'File input is globbed to %s' % self.
_value)
800 'Multiple file arguments are not supported for {0} (was given: {1}'.
format(self, self.
_value))
808 if value
not in (
'input',
'output',
'temporary'):
810 'File arguments must be specified as input, output or temporary - got {0}'.
format(value))
857 m = re.match(
r'(input|output|tmp.)([A-Za-z0-9_]+?)(File)?$', value)
859 msg.debug(
"ArgFile name setter matched this: {0}".
format(m.groups()))
860 if self.
_type is None:
861 dtype = m.group(2).
split(
'_', 1)[0]
863 if re.match(
r'D(RAW|ESD|AOD)', dtype):
865 msg.debug(
"Autoset data type to {0}".
format(dtype))
868 msg.debug(
"Autoset data subtype to {0}".
format(m.group(2)))
871 msg.debug(
"ArgFile name setter did not match against '{0}'".
format(value))
893 events = self.
getSingleMetadata(fname=fname, metadataKey=
'nentries', populate =
not fast)
895 msg.debug(
'Got events=None for file {0} - returning None for this instance'.
format(fname))
897 if events ==
'UNDEFINED':
898 msg.debug(
'Got events=UNDEFINED for file {0} - returning UNDEFINED for this instance'.
format(fname))
900 if not isinstance(events, int):
901 msg.warning(
'Got unexpected events metadata for file {0}: {1!s} - returning None for this instance'.
format(fname, events))
903 totalEvents += events
914 if files == []
or '_fileMetadata' not in dir(self):
916 for fname
in self.
value:
920 if fname
in self.
value:
927 if self.
_guid is not None:
928 msg.debug(
'Now trying to set file GUID metadata using {0}'.
format(self.
_guid))
933 msg.warning(
'Explicit GUID {0} was passed for file {1}, but this file is not a member of this instance'.
format(guid, fname))
941 def getMetadata(self, files = None, metadataKeys = None, maskMetadataKeys = None, populate = True, flush = False):
945 elif isinstance(files, str):
947 msg.debug(
'getMetadata will examine these files: {0!s}'.
format(files))
949 if metadataKeys
is None:
951 elif isinstance(metadataKeys, str):
952 metadataKeys = [metadataKeys,]
953 if maskMetadataKeys
is not None:
954 metadataKeys = [k
for k
in metadataKeys
if k
not in maskMetadataKeys]
955 msg.debug(
'getMetadata will retrieve these keys: {0!s}'.
format(metadataKeys))
958 msg.debug(
'Flushing cached metadata values')
962 msg.debug(
'Checking metadata values')
968 for mdkey
in metadataKeys:
974 msg.error(
'Did not find metadata key {0!s} for file {1!s} - setting to None'.
format(mdkey, fname))
975 metadata[fname][mdkey] =
None
986 if not (isinstance(fname, str)
and isinstance(metadataKey, str)):
988 'Illegal call to getSingleMetadata function: {0!s} {1!s}'.
format(fname, metadataKey))
989 md = self.
getMetadata(files = fname, metadataKeys = metadataKey, populate = populate, flush = flush)
990 return md[fname][metadataKey]
997 msg.debug(
'Retrieving metadata keys {1!s} for files {0!s}'.
format(files, metadataKeys))
1008 for key
in metadataKeys:
1009 if key !=
'_exists':
1013 for key
in metadataKeys:
1015 msg.debug(
'Metadata key {0} is unknown for {1}'.
format(key, self.__class__.__name__))
1019 msg.debug(
'Found cached value for {0}:{1} = {2!s}'.
format(fname, key, self.
_fileMetadata[fname][key]))
1021 msg.debug(
'No cached value for {0}:{1}. Calling generator function {2} ({3})'.
format(fname, key, self.
_metadataKeys[key].__name__, self.
_metadataKeys[key]))
1024 msg.info(
"Metadata generator called to obtain {0} for {1}".
format(key, files))
1027 msg.error(
'Calling {0!s} raised an exception: {1!s}'.
format(self.
_metadataKeys[key].__name__, e))
1029 msg.warning(
'Call to function {0} for {1} file {2} failed to populate metadata key {3}'.
format(self.
_metadataKeys[key].__name__, self.__class__.__name__, fname, key))
1050 for k, v
in metadataKeys.items():
1051 msg.debug(
'Manualy setting {0} for file {1} to {2}'.
format(k, fname, v))
1062 msg.debug(
'Testing for cached values for files {0} and keys {1}'.
format(files, metadataKeys))
1065 elif isinstance(files, str):
1067 if metadataKeys
is None:
1069 elif isinstance(metadataKeys, str):
1070 metadataKeys = (metadataKeys,)
1074 for key
in metadataKeys:
1076 isCachedFlag =
False
1078 if isCachedFlag
is False:
1093 for filename
in self.
_value:
1094 if filename.find(
'#') > -1:
1095 (dataset, fname) = filename.split(
'#', 1)
1096 newValue.append(fname)
1097 msg.debug(
'Current dataset: {0}; New dataset {1}'.
format(self.
_dataset, dataset))
1100 'Found inconsistent dataset assignment in argFile setup: %s != %s' % (self.
_dataset, dataset))
1102 if len(newValue) == 0:
1104 elif len(newValue) != len (self.
_value):
1106 'Found partial dataset assignment in argFile setup from {0} (dsn#lfn notation must be uniform for all inputs)'.
format(self.
_value))
1118 except OSError
as e:
1119 msg.error(
'Got exception {0!s} raised while stating file {1}'.
format(e, fname))
1123 msg.debug(
'Calling ROOT TFile.GetSize({0})'.
format(fname))
1134 with open(fname)
as f:
1137 chunk = len(f.read(1024*1024))
1138 msg.debug(
'Read {0} bytes from {1}'.
format(chunk, fname))
1142 except OSError
as e:
1143 msg.error(
'Got exception {0!s} raised while checking integrity of file {1}'.
format(e, fname))
1145 except UnicodeDecodeError:
1146 msg.debug(
'Problem reading file as unicode, attempting with binary')
1149 with open(fname,
'rb')
as f:
1152 chunk = len(f.read(1024*1024))
1153 msg.debug(
'Read {0} bytes from {1}'.
format(chunk, fname))
1157 except OSError
as e:
1158 msg.error(
'Got exception {0!s} raised while checking integrity of file {1}'.
format(e, fname))
1167 msg.debug(
'Generating a GUID for file {0}'.
format(fname))
1177 msg.debug(
'Testing existance for {0}'.
format(files))
1181 size = os.stat(fname).st_size
1184 msg.debug(
'POSIX file {0} exists'.
format(fname))
1185 except OSError
as e:
1186 msg.error(
'Got exception {0!s} raised while stating file {1} - probably it does not exist'.
format(e, fname))
1190 msg.debug(
'Calling ROOT TFile.GetSize({0})'.
format(fname))
1194 msg.error(
'Non-POSIX file {0} could not be opened - probably it does not exist'.
format(fname))
1196 msg.debug(
'Non-POSIX file {0} exists'.
format(fname))
1212 for arg
in copyArgs:
1214 myargdict[arg] = copy.copy(argdict[arg])
1217 myargdict = copy.copy(argdict)
1219 myargdict[
'checkEventCount'] =
argSubstepBool(
'False', runarg=
False)
1221 if 'athenaopts' in myargdict:
1224 for subStep
in myargdict[
'athenaopts'].value:
1227 for opt
in myargdict[
'athenaopts'].value[subStep]:
1228 if opt.startswith(
'--nprocs'):
1232 elif opt.startswith(
'--threads'):
1238 if hasNprocs
and hasNthreads:
1241 if opt.startswith(
'--threads'):
1250 def __init__(self, value=list(), io =
'output', type=
None, splitter=
',', runarg=
True, multipleOK=
None, name=
None):
1251 super(argYODAFile, self).
__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1257 'lheSumOfNegWeights': 0,
1261 msg.debug(
'Retrieving event count for LHE file {0}'.
format(files))
1266 tar = tarfile.open(fname,
"r:gz")
1268 for untar
in tar.getmembers():
1269 fileTXT = tar.extractfile(untar)
1270 if fileTXT
is not None :
1271 lines = fileTXT.read().
decode(
"utf-8")
1272 lhecount = lines.count(
'/event')
1276 msg.debug(
'Entries is set to None - event count undefined for this LHE')
1280 msg.debug(
'Retrieving weight count for LHE file {0}'.
format(files))
1288 tar = tarfile.open(fname,
"r:gz")
1289 for untar
in tar.getmembers():
1290 fileTXT = tar.extractfile(untar)
1292 if fileTXT
is not None :
1293 for line
in fileTXT :
1294 line = line.decode(
"utf-8")
1297 w =
float(re.sub(
' +',
' ',line).
split(
" ")[2])
1298 if w > 0 : weightPos += w
1299 else : weightNeg += abs(w)
1303 if "<event" in line :
1309 msg.debug(
'Entries is set to None - negative fraction count undefined for this LHE')
1316 def __init__(self, value = list(), type=
None, subtype=
None, io =
'output', splitter=
',', runarg=
True, multipleOK =
None,
1317 name=
None, executor=
list(), mergeTargetSize=-1, auxiliaryFile=
False):
1318 super(argAthenaFile, self).
__init__(value=value, subtype=subtype, io=io, type=type, splitter=splitter, runarg=runarg,
1319 multipleOK=multipleOK, name=name, executor=executor, mergeTargetSize=mergeTargetSize,
1320 auxiliaryFile=auxiliaryFile)
1323 for key
in athFileInterestingKeys:
1332 msg.debug(
'Will retrieve metadata info for {0!s}'.
format(myFiles))
1340 for fname
in myFiles:
1342 if athFileMetadata
is None:
1344 msg.debug(
'Setting metadata for file {0} to {1}'.
format(fname, athFileMetadata[fname]))
1349 self.
_callAthInfo(files, doAllFiles =
True, retrieveKeys=athFileInterestingKeys)
1353 desc=super(argAthenaFile, self).prodsysDescription
1360 integrityFunction =
"returnIntegrityOfBSFile"
1365 rc=
call([
"AtlListBSEvents",
"-c", fname], logger=msg, message=
"Report by AtlListBSEvents: ", timeout=600)
1375 desc=super(argBSFile, self).prodsysDescription
1386 msg.debug(
'selfMerge attempted for {0} -> {1} with {2} (index {3})'.
format(inputs, output, argdict, counter))
1389 for fname
in inputs:
1390 if fname
not in self.
_value:
1392 "File {0} is not part of this agument: {1}".
format(fname, self))
1398 myargdict[
'maskEmptyInputs'] =
argBool(
True)
1399 myargdict[
'allowRename'] =
argBool(
True)
1400 myargdict[
'emptyStubFile'] =
argString(inputs[0])
1405 myDataDictionary = {
'BS_MRG_INPUT' :
argBSFile(inputs, type=self.
type, io=
'input'),
1406 'BS_MRG_OUTPUT' :
argBSFile(output, type=self.
type, io=
'output')}
1407 myMergeConf = executorConfig(myargdict, myDataDictionary)
1408 myMerger = bsMergeExecutor(name=
'BSMergeAthenaMP{0}{1}'.
format(self.
_subtype, counter), conf=myMergeConf, exe =
'file_merging',
1409 inData=
set([
'BS_MRG_INPUT']), outData=
set([
'BS_MRG_OUTPUT']))
1410 myMerger.doAll(input=
set([
'BS_MRG_INPUT']), output=
set([
'BS_MRG_OUTPUT']))
1414 for fname
in inputs:
1418 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1427 integrityFunction =
"returnIntegrityOfPOOLFile"
1433 from PyJobTransforms.trfValidateRootFile
import checkFile
1434 rc=
checkFile(fileName=fname, the_type=
'event', requireTree=
False)
1442 desc=super(argPOOLFile, self).prodsysDescription
1451 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1454 for fname
in inputs:
1455 if fname
not in self.
_value:
1457 "File {0} is not part of this agument: {1}".
format(fname, self))
1467 myDataDictionary = {
'POOL_MRG_INPUT' :
argPOOLFile(inputs, type=self.
type, io=
'input'),
1469 myMergeConf = executorConfig(myargdict, myDataDictionary)
1470 myMerger = athenaExecutor(name=
'POOLMergeAthenaMP{0}{1}'.
format(self.
_subtype, counter), conf=myMergeConf,
1471 skeletonCA =
'RecJobTransforms.MergePool_Skeleton',
1472 inData=
set([
'POOL_MRG_INPUT']), outData=
set([
'POOL_MRG_OUTPUT']),
1473 disableMT=
True, disableMP=
True)
1474 myMerger.doAll(input=
set([
'POOL_MRG_INPUT']), output=
set([
'POOL_MRG_OUTPUT']))
1478 for fname
in inputs:
1482 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1488 integrityFunction =
"returnIntegrityOfPOOLFile"
1492 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1495 for fname
in inputs:
1496 if fname
not in self.
_value:
1498 "File {0} is not part of this agument: {1}".
format(fname, self))
1501 mySubstepName =
'HITSMergeAthenaMP{0}'.
format(counter)
1505 myDataDictionary = {
'HITS' :
argHITSFile(inputs, type=self.
type, io=
'input'),
1507 myMergeConf = executorConfig(myargdict, myDataDictionary)
1508 myMerger = athenaExecutor(name = mySubstepName,
1509 skeletonCA =
'SimuJobTransforms.HITSMerge_Skeleton',
1511 inData=
set([
'HITS']), outData=
set([
'HITS_MRG']),
1512 disableMT=
True, disableMP=
True)
1513 myMerger.doAll(input=
set([
'HITS']), output=
set([
'HITS_MRG']))
1517 for fname
in inputs:
1521 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1528 integrityFunction =
"returnIntegrityOfPOOLFile"
1532 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1535 for fname
in inputs:
1536 if fname
not in self.
_value:
1538 "File {0} is not part of this agument: {1}".
format(fname, self))
1541 mySubstepName =
'EVNT_TRMergeAthenaMP{0}'.
format(counter)
1547 myMergeConf = executorConfig(myargdict, myDataDictionary)
1548 myMerger = athenaExecutor(name = mySubstepName, skeletonFile =
'SimuJobTransforms/skeleton.EVNT_TRMerge.py',
1550 inData=
set([
'EVNT_TR']), outData=
set([
'EVNT_TR_MRG']),
1551 disableMT=
True, disableMP=
True)
1552 myMerger.doAll(input=
set([
'EVNT_TR']), output=
set([
'EVNT_TR_MRG']))
1556 for fname
in inputs:
1560 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1567 integrityFunction =
"returnIntegrityOfPOOLFile"
1571 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1574 for fname
in inputs:
1575 if fname
not in self.
_value:
1577 "File {0} is not part of this agument: {1}".
format(fname, self))
1583 myDataDictionary = {
'RDO' :
argHITSFile(inputs, type=self.
type, io=
'input'),
1585 myMergeConf = executorConfig(myargdict, myDataDictionary)
1586 myMerger = athenaExecutor(name =
'RDOMergeAthenaMP{0}'.
format(counter),
1587 skeletonCA =
'SimuJobTransforms.RDOMerge_Skeleton',
1589 inData=
set([
'RDO']), outData=
set([
'RDO_MRG']),
1590 disableMT=
True, disableMP=
True)
1591 myMerger.doAll(input=
set([
'RDO']), output=
set([
'RDO_MRG']))
1595 for fname
in inputs:
1599 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1605 integrityFunction =
"returnIntegrityOfPOOLFile"
1609 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1612 for fname
in inputs:
1613 if fname
not in self.
_value:
1615 "File {0} is not part of this agument: {1}".
format(fname, self))
1618 mySubstepName =
'EVNTMergeAthenaMP{0}'.
format(counter)
1622 myDataDictionary = {
'EVNT' :
argEVNTFile(inputs, type=self.
type, io=
'input'),
1624 myMergeConf = executorConfig(myargdict, myDataDictionary)
1625 myMerger = athenaExecutor(name = mySubstepName, skeletonCA =
'EvgenJobTransforms.EVNTMerge_Skeleton',
1627 inData=
set([
'EVNT']), outData=
set([
'EVNT_MRG']),
1628 disableMT=
True, disableMP=
True)
1629 myMerger.doAll(input=
set([
'EVNT']), output=
set([
'EVNT_MRG']))
1633 for fname
in inputs:
1637 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1645 integrityFunction =
"returnIntegrityOfHISTFile"
1647 def __init__(self, value=list(), io =
'output', type=
None, subtype=
None, splitter=
',', runarg=
True, countable=
True, multipleOK =
None,
1648 name=
None, auxiliaryFile=
False):
1649 super(argHISTFile, self).
__init__(value=value, io=io, type=type, subtype=subtype, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1650 name=name, auxiliaryFile=auxiliaryFile)
1669 msg.error(
'Timeout counting events for {0}'.
format(fname))
1673 desc=super(argHISTFile, self).prodsysDescription
1681 integrityFunction =
"returnIntegrityOfNTUPFile"
1686 def __init__(self, value=list(), io =
'output', type=
None, subtype=
None, splitter=
',', treeNames=
None, runarg=
True, multipleOK =
None,
1687 name=
None, mergeTargetSize=-1, auxiliaryFile=
False):
1688 super(argNTUPFile, self).
__init__(value=value, io=io, type=type, subtype=subtype, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1689 name=name, mergeTargetSize=mergeTargetSize, auxiliaryFile=auxiliaryFile)
1698 if name
and 'NTUP_PILEUP' in name:
1704 msg.debug(
'Retrieving event count for NTUP files {0}'.
format(files))
1709 if myEntries
is not None:
1711 if self.
name and 'NTUP_PILEUP' in self.
name:
1712 myEntries =
PRWEntries(fileName=fname, integral=
True)
1715 msg.debug(
'treeNames is set to None - event count undefined for this NTUP')
1722 msg.error(
'Timeout counting events for {0}'.
format(fname))
1727 from PyJobTransforms.trfValidateRootFile
import checkFile
1728 rc=
checkFile(fileName=fname, the_type=
'basket', requireTree=
False)
1736 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1739 for fname
in inputs:
1740 if fname
not in self.
_value:
1742 "File {0} is not part of this agument: {1}".
format(fname, self))
1750 myDataDictionary = {
'NTUP_MRG_INPUT' :
argNTUPFile(inputs, type=self.
type, io=
'input'),
1752 myMergeConf = executorConfig(myargdict, myDataDictionary)
1753 myMerger = NTUPMergeExecutor(name=
'NTUPMergeAthenaMP{0}{1}'.
format(self.
_subtype, counter), conf=myMergeConf,
1754 inData=
set([
'NTUP_MRG_INPUT']), outData=
set([
'NTUP_MRG_OUTPUT']))
1755 myMerger.doAll(input=
set([
'NTUP_MRG_INPUT']), output=
set([
'NYUP_MRG_OUTPUT']))
1759 for fname
in inputs:
1763 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1769 desc=super(argNTUPFile, self).prodsysDescription
1780 f = bz2.BZ2File(fname,
'r')
1782 chunk = len(f.read(1024*1024))
1783 msg.debug(
'Read {0} bytes from {1}'.
format(chunk, fname))
1788 except OSError
as e:
1789 msg.error(
'Got exception {0!s} raised while checking integrity of file {1}'.
format(e, fname))
1795 desc=super(argBZ2File, self).prodsysDescription
1801 def __init__(self, value=list(), io =
'output', type=
None, splitter=
',', runarg=
True, multipleOK=
None, name=
None):
1802 super(argFTKIPFile, self).
__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1812 f = bz2.BZ2File(fname,
'r')
1814 if line.startswith(
'F'):
1817 except OSError
as e:
1818 msg.error(
'Event count for file {0} failed: {1!s}'.
format(fname, e))
1823 desc=super(argFTKIPFile, self).prodsysDescription
1829 def __init__(self, value=list(), io =
'output', type=
'txt_evt', splitter=
',', runarg=
True, multipleOK=
None, name=
None):
1830 super(argHepEvtAsciiFile, self).
__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg,
1831 multipleOK=multipleOK, name=name)
1840 f =
open(fname,
'r')
1842 if len(line.split(
" "))==3:
1845 except OSError
as e:
1846 msg.error(
'Event count for file {0} failed: {1!s}'.
format(fname, e))
1851 def __init__(self, value=list(), io =
'output', type=
None, splitter=
',', runarg=
True, multipleOK=
None, name=
None):
1852 super(argLHEFile, self).
__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1858 'lheSumOfNegWeights': 0,
1862 msg.debug(
'Retrieving event count for LHE file {0}'.
format(files))
1867 tar = tarfile.open(fname,
"r:gz")
1869 for untar
in tar.getmembers():
1870 fileTXT = tar.extractfile(untar)
1871 if fileTXT
is not None :
1872 lines = fileTXT.read().
decode(
"utf-8")
1873 lhecount = lines.count(
'/event')
1877 msg.debug(
'Entries is set to None - event count undefined for this LHE')
1881 msg.debug(
'Retrieving weight count for LHE file {0}'.
format(files))
1889 tar = tarfile.open(fname,
"r:gz")
1890 for untar
in tar.getmembers():
1891 fileTXT = tar.extractfile(untar)
1893 if fileTXT
is not None :
1894 lines = fileTXT.readlines()
1898 w =
float(re.sub(
' +',
' ',line).
split(
" ")[2])
1899 if w > 0 : weightPos += w
1900 else : weightNeg += abs(w)
1904 if "<event" in line :
1910 msg.debug(
'Entries is set to None - negative fraction count undefined for this LHE')
1911 self.
_fileMetadata[fname][
'lheSumOfPosWeights'] =
'UNDEFINED'
1912 self.
_fileMetadata[fname][
'lheSumOfNegWeights'] =
'UNDEFINED'
1923 def __init__(self, value = None, runarg = True, name = None, defaultSubstep = 'all', separator = ':
'):
1926 super(argSubstep, self).__init__(value, runarg, name)
1936 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
1939 elif isinstance(value, str):
1941 elif isinstance(value, (list, tuple)):
1945 if not isinstance(item, str):
1948 elif isinstance(value, dict):
1960 subStepMatch = re.match(
r'([a-zA-Z0-9,]+)' + self.
_separator +
r'(.*)', string)
1963 subStep = subStepMatch.group(1).
split(
',')
1964 subStepValue = subStepMatch.group(2)
1967 subStepValue = string
1968 msg.debug(
'Parsed {0} as substep {1}, argument {2}'.
format(string, subStep, subStepValue))
1969 for step
in subStep:
1970 subStepList.append((step, subStepValue))
1982 substep = exe.substep
1983 first = exe.conf.firstExecutor
1990 value = self.
_value[name]
1991 elif substep
in self.
_value:
1992 value = self.
_value[substep]
1993 elif first
and 'first' in self.
_value:
1994 value = self.
_value[
'first']
1995 elif 'default' in self.
_value:
1996 value = self.
_value[
'default']
2008 value = self.
_value[
'all']
2009 elif isinstance(value, list):
2010 value = self.
_value[
'all'] + value
2012 msg.debug(
'From substep argument {myvalue} picked value "{value}" for {name}, {substep}, first={first}'.
format(myvalue=self.
_value, value=value, name=name, substep=substep, first=first))
2018 desc = {
'type':
'substep',
'substeptype':
'str',
'separator': self.
_separator,
2033 def __init__(self, value = None, runarg = True, name = None, defaultSubstep = 'all', splitter = None, separator=':
'):
2035 super(argSubstepList, self).__init__(value, runarg, name, defaultSubstep, separator)
2045 desc = {
'type':
'substep',
'substeptype':
'list',
'listtype':
'str',
2051 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
2054 elif isinstance(value, str):
2056 elif isinstance(value, (list, tuple)):
2060 if not isinstance(item, str):
2063 for subStep
in subStepList:
2064 if subStep[0]
in self.
_value:
2065 self.
_value[subStep[0]].extend(subStep[1])
2067 self.
_value[subStep[0]] = subStep[1]
2068 elif isinstance(value, dict):
2069 for k, v
in value.items():
2070 if not isinstance(k, str):
2072 if not isinstance(v, list):
2083 subStepList = [(s[0], s[1].
split(self.
_splitter))
for s
in subStepList]
2085 subStepList = [(s[0], [s[1]])
for s
in subStepList]
2098 desc = {
'type':
'substep',
'substeptype':
'str',
'separator': self.
_separator,
2104 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
2107 elif isinstance(value, str):
2109 self.
_value = dict([(subStep[0], subStep[1])
for subStep
in subStepList])
2110 elif isinstance(value, (list, tuple)):
2114 if not isinstance(item, str):
2117 for subStep
in subStepList:
2118 self.
_value[subStep[0]] = subStep[1]
2119 elif isinstance(value, dict):
2120 for k, v
in value.items():
2121 if not isinstance(k, str):
2123 if not isinstance(v, str):
2139 desc = {
'type':
'substep',
'substeptype':
'bool',
'separator': self.
_separator,
2145 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1})'.
format(value,
type(value)))
2148 elif isinstance(value, bool):
2150 elif isinstance(value, str):
2152 self.
_value = dict([(subStep[0],
strToBool(subStep[1]))
for subStep
in subStepList])
2153 elif isinstance(value, (list, tuple)):
2157 if not isinstance(item, str):
2160 for subStep
in subStepList:
2162 elif isinstance(value, dict):
2163 for k, v
in value.items():
2164 if not isinstance(k, str):
2166 if not isinstance(v, bool):
2183 desc = {
'type':
'substep',
'substeptype':
'int',
'separator': self.
_separator,
2189 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
2193 elif isinstance(value, int):
2195 elif isinstance(value, str):
2197 self.
_value = dict([(subStep[0],
int(subStep[1]))
for subStep
in subStepList])
2198 elif isinstance(value, (list, tuple)):
2202 if not isinstance(item, str):
2205 for subStep
in subStepList:
2206 self.
_value[subStep[0]] =
int(subStep[1])
2207 elif isinstance(value, dict):
2208 for k, v
in value.items():
2209 if not isinstance(k, str):
2211 if not isinstance(v, int):
2224 def __init__(self, value=None, min=None, max=None, runarg=True, name=None):
2227 super(argSubstepFloat, self).
__init__(value = value, runarg = runarg, name=name)
2231 desc = {
'type':
'substep',
'substeptype':
'float',
'separator': self.
_separator,
2234 desc[
'min'] = self.
_min
2236 desc[
'max'] = self.
_max
2247 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
2251 elif isinstance(value, float):
2253 elif isinstance(value, str):
2255 self.
_value = dict([(subStep[0],
float(subStep[1]))
for subStep
in subStepList])
2256 elif isinstance(value, (list, tuple)):
2260 if not isinstance(item, str):
2262 'Failed to convert list item {0!s} to substep (should be a string)'.
format(item))
2264 for subStep
in subStepList:
2266 elif isinstance(value, dict):
2267 for k, v
in value.items():
2268 if not isinstance(k, str):
2270 'Dictionary key {0!s} for substep is not a string'.
format(k))
2271 if not isinstance(v, float):
2273 'Dictionary value {0!s} for substep is not an float'.
format(v))
2277 'Setter value {0!s} (type {1}) for substep argument cannot be parsed'.
format(value,
type(value)))
2280 if (self.
_min is not None and my_float < self.
_min)
or (self.
_max is not None and my_float > self.
_max):
2282 'argFloat value out of range: {0} is not between {1} and {2}'.
format(my_float, self.
_min, self.
_max))
2283 except ValueError
as e:
2285 'Failed to convert substep value {0} to float: {1}'.
format(value, e))
2301 'doRDO_TRIG': {
'RAWtoALL': [(
'in',
'-',
'RDO'), (
'in',
'+',
'RDO_TRIG'), (
'in',
'-',
'BS')]},
2302 'doOverlay': {
'HITtoRDO': [(
'in',
'-',
'HITS'), (
'out',
'-',
'RDO'), (
'out',
'-',
'RDO_FILT')],
2303 'Overlay': [(
'in',
'+', (
'HITS',
'RDO_BKG')), (
'out',
'+',
'RDO')]},
2304 'doFCwOverlay': {
'EVNTtoRDO': [(
'in',
'-',
'EVNT'), (
'out',
'-',
'RDO')],
2305 'EVNTtoRDOwOverlay': [(
'in',
'+', (
'EVNT',
'RDO_BKG')), (
'out',
'+',
'RDO'), (
'out',
'+',
'RDO_SGNL')]},
2306 'afterburn': {
'generate': [(
'out',
'-',
'EVNT')]},
2324 desc = {
'type':
'substep',
'substeptype':
'steering',
'listtype':
'str',
'separator': self.
_separator,
2333 msg.debug(
'Attempting to set argSubstepSteering from {0!s} (type {1})'.
format(value,
type(value)))
2337 elif isinstance(value, dict):
2339 for k, v
in value.items():
2340 if not isinstance(k, str)
or not isinstance(v, list):
2342 'Failed to convert dict {0!s} to argSubstepSteering'.
format(value))
2344 if not isinstance(subv, (list, tuple))
or len(subv) != 3
or subv[0]
not in (
'in',
'out')
or subv[1]
not in (
'+',
'-'):
2346 'Failed to convert dict {0!s} to argSubstepSteering'.
format(value))
2352 self.
_dumpvalue = getattr(self,
"_dumpvalue", value)
2353 elif isinstance(value, (str, list, tuple)):
2354 if isinstance(value, str):
2356 self.
_dumpvalue = getattr(self,
"_dumpvalue", value)
2360 if not isinstance(item, str):
2362 'Failed to convert list item {0!s} to substep (should be a string)'.
format(item))
2363 if item
in argSubstepSteering.steeringAlises:
2364 msg.debug(
"Found value {0} in steeringAlises ({1})".
format(item, argSubstepSteering.steeringAlises[item]))
2365 for substep, steerlist
in argSubstepSteering.steeringAlises[item].
items():
2366 if substep
in self.
_value:
2367 self.
_value[substep].extend(steerlist)
2369 self.
_value[substep] = steerlist
2375 'Setter value {0!s} (type {1}) for substep argument cannot be parsed'.
format(value,
type(value)))
2378 if string
in argSubstepSteering.steeringAlises:
2379 return argSubstepSteering.steeringAlises[string]
2383 for subvalue
in ivalue.split(
','):
2384 matchedParts = re.match(
r'(in|out)(\+|\-)([A-Z_]+)$', subvalue)
2385 if not matchedParts:
2387 'Failed to convert string {0!s} to argSubstepSteering'.
format(subvalue))
2388 retvalue.append((matchedParts.group(1), matchedParts.group(2), matchedParts.group(3)))
2400 msg.debug(
'Attempting to set argSubstepConditions from {0!s} (type {1}'.
format(value,
type(value)))
2402 super(self.__class__, self.__class__).value.fset(self, value)
2406 if "CurrentMC" == v:
2412 cmd =
"COMAGetGlobalTagNameByCurrentState --state=CurrentMC"
2413 return str(client.execute(cmd, format =
'dom_object').get_rows().pop()[
'globalTag'])
2417 desc = {
'type':
'substep',
'substeptype':
'str',
'separator': self.
_separator,
2435 super(trfArgParser, self).
__init__(*args, **kwargs)
2438 argName = args[0].lstrip(
'-')
2439 msg.debug(
'Found arg name {0}'.
format(argName))
2445 'Transform arguments may not use hyphens (use camelCase or underscore')
2450 msg.debug(
'Double definition of argument {0} - ignored'.
format(argName))
2454 if 'help' in kwargs:
2458 if 'action' in kwargs
and 'factory' in dir(kwargs[
'action']):
2459 self.
_argClass[argName] = kwargs[
'action'].factory
2460 elif 'type' in kwargs:
2461 self.
_argClass[argName] = kwargs[
'type']
2467 for arg
in (
'group',):
2469 strippedArgs[arg] = kwargs.pop(arg)
2473 for i
in range(1, len(args)):
2474 argAlias = args[i].lstrip(
'-')
2475 msg.debug(
'Adding an alias of {0}: {1}'.
format(argName, argAlias))
2479 if 'group' in strippedArgs:
2481 msg.debug(
'Adding argument to group {0}: ({1}; {2})'.
format(strippedArgs[
'group'], args, kwargs))
2485 msg.warning(
'Argument group {0} not defined - adding argument to main parser'.
format(strippedArgs[
'group']))
2486 msg.debug(
'Adding argument: ({0}; {1})'.
format(args, kwargs))
2487 super(trfArgParser, self).
add_argument(*args, **kwargs)
2489 msg.debug(
'Adding argument: ({0}; {1})'.
format(args, kwargs))
2490 super(trfArgParser, self).
add_argument(*args, **kwargs)
2496 msg.debug(
'Detected the local variable {0}'.
format(name))
2497 if argClass
is not None:
2498 desc[name] = argClass().prodsysDescription
2509 msg.warning(
'Argument group %s already exists', args[0])
2511 self.
_argGroups[args[0]] = self.add_argument_group(*args)
2524 keyArray = [
'--' +
str(key)
for key
in self.
_helpString if key
not in (
'h',
'verbose',
'loglevel',
'dumpargs',
'argdict') ]
2526 print(
'ListOfDefaultPositionalKeys={0}'.
format(keyArray))
2538 newValueObj = value[0]
2539 msg.debug(
'Started with: %s = %s',
type(newValueObj), newValueObj)
2540 if isinstance(value[0], argSubstep):
2543 elif isinstance(value[0], list):
2546 msg.debug(
'Handling a list of arguments for key')
2550 processedValueObj.value = processedValues
2551 newValues.append(processedValueObj)
2552 newValueObj = newValues
2553 return newValueObj, newValues
2554 elif isinstance(value[0].value, list):
2555 newValues = value[0].value
2556 elif isinstance(value[0].value, dict):
2557 newValues = value[0].value
2559 newValues = [value[0].value,]
2560 for valueObj
in value[1:]:
2561 msg.debug(
'Value Object: %s = %s',
type(valueObj), valueObj)
2562 if isinstance(value[0], argSubstep):
2565 elif isinstance(valueObj.value, list):
2567 newValues.extend(valueObj.value)
2568 elif isinstance(valueObj.value, dict):
2570 newValues.update(valueObj.value)
2572 newValues.append(valueObj.value)
2573 return newValueObj, newValues
2582 super(trfArgParser, self).
parse_args(args = args, namespace = namespace)
2584 namespace = super(trfArgParser, self).
parse_args(args = args)
2585 for k, v
in namespace.__dict__.items():
2586 msg.debug(
'Treating key %s (%s)', k, v)
2587 if isinstance(v, list):
2589 if not isinstance(newValueObj, list):
2590 newValueObj.value = newValues
2591 namespace.__dict__[k] = newValueObj
2592 msg.debug(
'Set to %s', newValues)
2600 msg.debug(
"converting string {string} to boolean".
format(string = string))
2601 if string.lower() ==
'false':
2603 elif string.lower() ==
'true':
2607 except AttributeError:
2621 allKeys =
set(dict1) |
set(dict2)
2625 if isinstance(
list(dict1.values())[0], list):
2627 elif len(dict2) > 0:
2628 if isinstance(
list(dict2.values())[0], list):
2632 mergeDict[key] = dict1.get(key, []) + dict2.get(key, [])
2635 if key
in dict1
and key
in dict2:
2637 if dict1[key] != dict2[key]:
2639 'Merging substep arguments found clashing values for substep {0}: {1}!={2}'.
format(key, dict1[key], dict2[key]))
2640 mergeDict[key] = dict1[key]
2642 mergeDict[key] = dict1[key]
2644 mergeDict[key] = dict2[key]