17 msg = logging.getLogger(__name__)
19 import PyJobTransforms.trfExceptions
as trfExceptions
21 from PyJobTransforms.trfFileUtils import athFileInterestingKeys, AthenaLiteFileInfo, NTUPEntries, HISTEntries, PHYSVALEntries, PRWEntries, urlType, ROOTGetSize
23 from PyJobTransforms.trfExeStepTools
import commonExecutorStepName
25 from PyJobTransforms.trfDecorators
import timelimited
33 msg.debug(
'Initialised class %s with args=%s; kwargs=%s', genclass, args, kwargs)
39 msg.debug(
'Called class %s with value=%s; args=%s; kwargs=%s', self.
_genclass, valueString, self.
_args, self.
_kwargs)
47 if valueString
is None:
51 except Exception
as e:
52 msg.fatal(
'Got this exception raised when calling object factory: {0}'.
format(e))
62 def __init__(self, factory, option_strings, dest, **kwargs):
64 super().
__init__(option_strings, dest, **kwargs)
66 def __call__(self, parser, namespace, values, option_string=None):
67 msg.debug(
'Called action for factory=%s; values=%s', self.
_factory, values)
70 if isinstance(values, list):
73 setattr(namespace, self.dest, [self.
_factory(
None)])
75 setattr(namespace, self.dest, [self.
_factory(v)
for v
in values])
77 setattr(namespace, self.dest, self.
_factory(values))
85 msg.debug(
'Initialised action class %s with args=%s; kwargs=%s', genclass, args, kwargs)
92 def __call__(self, option_strings, dest, **kwargs):
109 def __init__(self, value = None, runarg = True, name = None):
149 desc = {
'type' :
None}
154 return '{0}: Value {1} (isRunArg={2})'.
format(self.__class__.__name__, self.
_value, self.
_runarg)
162 return self.
value == other.value
165 return self.
value != other.value
168 return self.
value < other.value
171 return self.
value > other.value
182 def __init__(self, value = None, runarg = True, name = None, choices = None):
184 super(argString, self).
__init__(value = value, runarg = runarg, name=name)
214 desc = {
'type' :
'str'}
242 if isinstance(value, int):
248 except ValueError
as e:
250 'Failed to convert value {0} to int: {1}'.
format(value, e))
255 desc = {
'type' :
'int'}
267 def __init__(self, value=None, min=None, max=None, runarg=True, name=None):
270 super(argFloat, self).
__init__(value = value, runarg = runarg, name=name)
280 desc = {
'type' :
'float'}
282 desc[
'min'] = self.
_min
284 desc[
'max'] = self.
_max
296 if self.
_min is not None:
302 if isinstance(value, float):
308 'Failed to convert %s to a float' %
str(value))
312 'argFloat value out of range: %g is not between %s and %s' %
336 if isinstance(value, bool):
344 desc = {
'type' :
'bool'}
356 def __init__(self, value = [], supressEmptyStrings = True, splitter=',', runarg=True, name=None):
360 super(argList, self).
__init__(value = value, runarg = runarg, name=name)
371 desc = {
'type' :
'list',
'listtype':
'str'}
381 if isinstance(value, (list, tuple)):
392 except AttributeError:
394 'Failed to convert %s to a list' %
str(value))
428 if isinstance(value, list):
430 if not isinstance(v, int):
432 'Illegal value {0} in list of ints'.
format(v))
444 except (AttributeError, ValueError):
446 'Failed to convert %s to a list of ints' %
str(value))
453 desc = {
'type' :
'list',
'listtype' :
'int'}
465 def __init__(self, value = {}, supressEmptyStrings = True, splitter=',', kvsplitter=":
", runarg=True, name=None):
470 super(argList, self).__init__(value = value, runarg = runarg, name=name)
485 if isinstance(value, dict):
486 for k, v
in value.items():
487 if not isinstance(k, str):
489 'Illegal key argument type {0} in dictionary for argKeyFloatValueList'.
format(k))
490 if not isinstance(v, float):
492 'Illegal value argument type {0} in dictionary for argKeyFloatValueList'.
format(v))
501 kvlist = [ v
for v
in value.split(self.
_splitter)
if v !=
'' ]
507 except (AttributeError, ValueError):
509 'Failed to convert {0} to a dictionary of string:int values'.
format(value))
516 desc = {
'type' :
'list',
'listtype' :
'str:float'}
544 def __init__(self, value=list(), type=
None, subtype=
None, io =
'output', splitter=
',', runarg=
True, guid=
None,
545 multipleOK =
None, name=
None, executor=
list(), mergeTargetSize=-1, auxiliaryFile=
False):
576 if multipleOK
is None:
585 super(argFile, self).
__init__(value=value, splitter=splitter, runarg=runarg, name=name)
617 @mergeTargetSize.setter
626 if isinstance(self.
_type, dict):
628 desc = {
'type' :
'file',
'subtype' :
"NONE" }
632 desc = {
'type' :
'file',
'subtype' :
str(self.
_type).
upper()}
647 if isinstance(value, (list, tuple)):
648 if len(value) > 0
and isinstance(value[0], dict):
656 'Filename (key "lfn") not found in Tier-0 file dictionary: {0}'.
format(myfile))
657 for k, v
in myfile.items():
662 elif k ==
'checksum':
669 'Inconsistent dataset names in Tier-0 dictionary: {0} != {1}'.
format(self.
dataset, v))
679 if value.lower().startswith(
'lfn'):
681 from PyUtils.PoolFile
import file_name
688 if self.
_io ==
'output' and (
'[' in value)
and (
']' in value):
694 except (AttributeError, TypeError):
696 'Failed to convert %s to a list' %
str(value))
699 deDuplicatedValue = []
701 if fname
not in deDuplicatedValue:
702 deDuplicatedValue.append(fname)
704 msg.warning(
"Removing duplicated file {0} from file list".
format(fname))
705 if len(self.
_value) != len(deDuplicatedValue):
706 self.
_value = deDuplicatedValue
707 msg.warning(
'File list after duplicate removal: {0}'.
format(self.
_value))
718 if self.
_io ==
'input':
724 msg.debug(
'Found POSIX filesystem input - activating globbing')
726 for filename
in self.
_value:
728 globbedFiles = glob.glob(filename)
729 if len(globbedFiles) == 0:
731 'Input file argument {0} globbed to NO input files - probably the file(s) are missing'.
format(filename))
734 newValue.extend(globbedFiles)
737 msg.debug (
'File input is globbed to %s' % self.
_value)
740 msg.debug(
'Found root filesystem input - activating globbing')
742 for filename
in self.
_value:
743 if str(filename).startswith(
"root"):
744 msg.debug(
'Found input file name starting with "root," setting XRD_RUNFORKHANDLER=1, which enables fork handlers for xrootd in direct I/O')
745 os.environ[
"XRD_RUNFORKHANDLER"] =
"1"
746 if str(filename).startswith(
"https")
or str(filename).startswith(
"davs")
or not(
str(filename).endswith(
'/'))
and '*' not in filename
and '?' not in filename:
747 msg.debug(
'Seems that only one file was given: {0}'.
format(filename))
748 newValue.extend(([filename]))
753 if '*' in filename
or '?' in filename:
754 msg.debug(
'Split input into path for listdir() and a filemask to select available files.')
755 path = filename[0:filename.rfind(
'/')+1]
756 msg.debug(
'path: {0}'.
format(path))
757 fileMask = filename[filename.rfind(
'/')+1:len(filename)]
758 msg.debug(
'Will select according to: {0}'.
format(fileMask))
760 cmd = [
'/afs/cern.ch/project/eos/installation/atlas/bin/eos.select' ]
761 if not os.access (
'/afs/cern.ch/project/eos/installation/atlas/bin/eos.select', os.X_OK ):
763 'No execute access to "eos.select" - could not glob EOS input files.')
770 proc = subprocess.Popen(args = cmd,bufsize = 1, shell =
False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
772 output = proc.stdout.readlines()
775 'EOS list command ("{0!s}") failed: rc {1}, output {2}'.
format(cmd, rc, output))
776 msg.debug(
"eos returned: {0}".
format(output))
779 myFiles += [
str(path)+
str(line.rstrip(
'\n'))]
781 patt = re.compile(fileMask.replace(
'*',
'.*').
replace(
'?',
'.'))
782 for srmFile
in myFiles:
784 if(patt.search(srmFile))
is not None:
786 msg.debug(
'match: %s',srmFile)
787 newValue.extend(([srmFile]))
789 newValue.extend(([srmFile]))
791 msg.debug(
'Selected files: %s', newValue)
792 except (AttributeError, TypeError, OSError):
794 'Failed to convert %s to a list' %
str(value))
795 if len(self.
_value) > 0
and len(newValue) == 0:
798 'Input file argument(s) {0!s} globbed to NO input files - ls command failed')
800 msg.debug (
'File input is globbed to %s' % self.
_value)
804 'Multiple file arguments are not supported for {0} (was given: {1}'.
format(self, self.
_value))
812 if value
not in (
'input',
'output',
'temporary'):
814 'File arguments must be specified as input, output or temporary - got {0}'.
format(value))
861 m = re.match(
r'(input|output|tmp.)([A-Za-z0-9_]+?)(File)?$', value)
863 msg.debug(
"ArgFile name setter matched this: {0}".
format(m.groups()))
864 if self.
_type is None:
865 dtype = m.group(2).
split(
'_', 1)[0]
867 if re.match(
r'D(RAW|ESD|AOD)', dtype):
869 msg.debug(
"Autoset data type to {0}".
format(dtype))
872 msg.debug(
"Autoset data subtype to {0}".
format(m.group(2)))
875 msg.debug(
"ArgFile name setter did not match against '{0}'".
format(value))
897 events = self.
getSingleMetadata(fname=fname, metadataKey=
'nentries', populate =
not fast)
899 msg.debug(
'Got events=None for file {0} - returning None for this instance'.
format(fname))
901 if events ==
'UNDEFINED':
902 msg.debug(
'Got events=UNDEFINED for file {0} - returning UNDEFINED for this instance'.
format(fname))
904 if not isinstance(events, int):
905 msg.warning(
'Got unexpected events metadata for file {0}: {1!s} - returning None for this instance'.
format(fname, events))
907 totalEvents += events
918 if files == []
or '_fileMetadata' not in dir(self):
920 for fname
in self.
value:
924 if fname
in self.
value:
931 if self.
_guid is not None:
932 msg.debug(
'Now trying to set file GUID metadata using {0}'.
format(self.
_guid))
937 msg.warning(
'Explicit GUID {0} was passed for file {1}, but this file is not a member of this instance'.
format(guid, fname))
945 def getMetadata(self, files = None, metadataKeys = None, maskMetadataKeys = None, populate = True, flush = False):
949 elif isinstance(files, str):
951 msg.debug(
'getMetadata will examine these files: {0!s}'.
format(files))
953 if metadataKeys
is None:
955 elif isinstance(metadataKeys, str):
956 metadataKeys = [metadataKeys,]
957 if maskMetadataKeys
is not None:
958 metadataKeys = [k
for k
in metadataKeys
if k
not in maskMetadataKeys]
959 msg.debug(
'getMetadata will retrieve these keys: {0!s}'.
format(metadataKeys))
962 msg.debug(
'Flushing cached metadata values')
966 msg.debug(
'Checking metadata values')
972 for mdkey
in metadataKeys:
978 msg.error(
'Did not find metadata key {0!s} for file {1!s} - setting to None'.
format(mdkey, fname))
979 metadata[fname][mdkey] =
None
990 if not (isinstance(fname, str)
and isinstance(metadataKey, str)):
992 'Illegal call to getSingleMetadata function: {0!s} {1!s}'.
format(fname, metadataKey))
993 md = self.
getMetadata(files = fname, metadataKeys = metadataKey, populate = populate, flush = flush)
994 return md[fname][metadataKey]
1001 msg.debug(
'Retrieving metadata keys {1!s} for files {0!s}'.
format(files, metadataKeys))
1012 for key
in metadataKeys:
1013 if key !=
'_exists':
1017 for key
in metadataKeys:
1019 msg.debug(
'Metadata key {0} is unknown for {1}'.
format(key, self.__class__.__name__))
1023 msg.debug(
'Found cached value for {0}:{1} = {2!s}'.
format(fname, key, self.
_fileMetadata[fname][key]))
1025 msg.debug(
'No cached value for {0}:{1}. Calling generator function {2} ({3})'.
format(fname, key, self.
_metadataKeys[key].__name__, self.
_metadataKeys[key]))
1028 msg.info(
"Metadata generator called to obtain {0} for {1}".
format(key, files))
1031 msg.error(
'Calling {0!s} raised an exception: {1!s}'.
format(self.
_metadataKeys[key].__name__, e))
1033 msg.warning(
'Call to function {0} for {1} file {2} failed to populate metadata key {3}'.
format(self.
_metadataKeys[key].__name__, self.__class__.__name__, fname, key))
1054 for k, v
in metadataKeys.items():
1055 msg.debug(
'Manualy setting {0} for file {1} to {2}'.
format(k, fname, v))
1066 msg.debug(
'Testing for cached values for files {0} and keys {1}'.
format(files, metadataKeys))
1069 elif isinstance(files, str):
1071 if metadataKeys
is None:
1073 elif isinstance(metadataKeys, str):
1074 metadataKeys = (metadataKeys,)
1078 for key
in metadataKeys:
1080 isCachedFlag =
False
1082 if isCachedFlag
is False:
1097 for filename
in self.
_value:
1098 if filename.find(
'#') > -1:
1099 (dataset, fname) = filename.split(
'#', 1)
1100 newValue.append(fname)
1101 msg.debug(
'Current dataset: {0}; New dataset {1}'.
format(self.
_dataset, dataset))
1104 'Found inconsistent dataset assignment in argFile setup: %s != %s' % (self.
_dataset, dataset))
1106 if len(newValue) == 0:
1108 elif len(newValue) != len (self.
_value):
1110 'Found partial dataset assignment in argFile setup from {0} (dsn#lfn notation must be uniform for all inputs)'.
format(self.
_value))
1122 except OSError
as e:
1123 msg.error(
'Got exception {0!s} raised while stating file {1}'.
format(e, fname))
1127 msg.debug(
'Calling ROOT TFile.GetSize({0})'.
format(fname))
1138 with open(fname)
as f:
1141 chunk = len(f.read(1024*1024))
1142 msg.debug(
'Read {0} bytes from {1}'.
format(chunk, fname))
1146 except OSError
as e:
1147 msg.error(
'Got exception {0!s} raised while checking integrity of file {1}'.
format(e, fname))
1149 except UnicodeDecodeError:
1150 msg.debug(
'Problem reading file as unicode, attempting with binary')
1153 with open(fname,
'rb')
as f:
1156 chunk = len(f.read(1024*1024))
1157 msg.debug(
'Read {0} bytes from {1}'.
format(chunk, fname))
1161 except OSError
as e:
1162 msg.error(
'Got exception {0!s} raised while checking integrity of file {1}'.
format(e, fname))
1171 msg.debug(
'Generating a GUID for file {0}'.
format(fname))
1182 msg.debug(
'Testing existance for {0}'.
format(files))
1183 def split_filelist(fn):
1184 if self.
io !=
'output':
1186 file_split_regex = re.compile(
r"(.+)\[(.+)](.+)")
1187 if (
'[' in fn)
and (
']' in fn):
1188 match = file_split_regex.match(fn)
1189 return [f
"{match.group(1)}{it}{match.group(3)}" for it
in match.group(2).
split(
',')]
1193 file_list = split_filelist(fname)
1196 size = map(
lambda fn: os.stat(fn).st_size, file_list)
1199 msg.debug(
'POSIX file {0} exists (or all elements of list)'.
format(fname))
1200 except OSError
as e:
1201 msg.error(
'Got exception {0!s} raised while stating file {1} (or some element of list) - probably it does not exist'.
format(e, fname))
1205 msg.debug(
'Calling ROOT TFile.GetSize on {0} (or elements of list)'.
format(fname))
1206 size = map(ROOTGetSize, file_list)
1209 msg.error(
'Non-POSIX file {0} (or element of list) could not be opened - probably it does not exist'.
format(fname))
1211 msg.debug(
'Non-POSIX file {0} (or all elements of list) exists'.
format(fname))
1227 for arg
in copyArgs:
1229 myargdict[arg] = copy.copy(argdict[arg])
1232 myargdict = copy.copy(argdict)
1234 myargdict[
'checkEventCount'] =
argSubstepBool(
'False', runarg=
False)
1236 if 'athenaopts' in myargdict:
1239 for subStep
in myargdict[
'athenaopts'].value:
1242 for opt
in myargdict[
'athenaopts'].value[subStep]:
1243 if opt.startswith(
'--nprocs'):
1247 elif opt.startswith(
'--threads'):
1253 if hasNprocs
and hasNthreads:
1256 if opt.startswith(
'--threads'):
1265 def __init__(self, value=list(), io =
'output', type=
None, splitter=
',', runarg=
True, multipleOK=
None, name=
None):
1266 super(argYODAFile, self).
__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1272 'lheSumOfNegWeights': 0,
1276 msg.debug(
'Retrieving event count for LHE file {0}'.
format(files))
1281 tar = tarfile.open(fname,
"r:gz")
1283 for untar
in tar.getmembers():
1284 fileTXT = tar.extractfile(untar)
1285 if fileTXT
is not None :
1286 lines = fileTXT.read().
decode(
"utf-8")
1287 lhecount = lines.count(
'/event')
1291 msg.debug(
'Entries is set to None - event count undefined for this LHE')
1295 msg.debug(
'Retrieving weight count for LHE file {0}'.
format(files))
1303 tar = tarfile.open(fname,
"r:gz")
1304 for untar
in tar.getmembers():
1305 fileTXT = tar.extractfile(untar)
1307 if fileTXT
is not None :
1308 for line
in fileTXT :
1309 line = line.decode(
"utf-8")
1312 w =
float(re.sub(
' +',
' ',line).
split(
" ")[2])
1313 if w > 0 : weightPos += w
1314 else : weightNeg += abs(w)
1318 if "<event" in line :
1324 msg.debug(
'Entries is set to None - negative fraction count undefined for this LHE')
1331 def __init__(self, value = list(), type=
None, subtype=
None, io =
'output', splitter=
',', runarg=
True, multipleOK =
None,
1332 name=
None, executor=
list(), mergeTargetSize=-1, auxiliaryFile=
False):
1333 super(argAthenaFile, self).
__init__(value=value, subtype=subtype, io=io, type=type, splitter=splitter, runarg=runarg,
1334 multipleOK=multipleOK, name=name, executor=executor, mergeTargetSize=mergeTargetSize,
1335 auxiliaryFile=auxiliaryFile)
1338 for key
in athFileInterestingKeys:
1347 msg.debug(
'Will retrieve metadata info for {0!s}'.
format(myFiles))
1355 for fname
in myFiles:
1357 if athFileMetadata
is None:
1359 msg.debug(
'Setting metadata for file {0} to {1}'.
format(fname, athFileMetadata[fname]))
1364 self.
_callAthInfo(files, doAllFiles =
True, retrieveKeys=athFileInterestingKeys)
1368 desc=super(argAthenaFile, self).prodsysDescription
1375 integrityFunction =
"returnIntegrityOfBSFile"
1380 rc=
call([
"AtlListBSEvents",
"-c", fname], logger=msg, message=
"Report by AtlListBSEvents: ", timeout=600)
1390 desc=super(argBSFile, self).prodsysDescription
1401 msg.debug(
'selfMerge attempted for {0} -> {1} with {2} (index {3})'.
format(inputs, output, argdict, counter))
1404 for fname
in inputs:
1405 if fname
not in self.
_value:
1407 "File {0} is not part of this agument: {1}".
format(fname, self))
1413 myargdict[
'maskEmptyInputs'] =
argBool(
True)
1414 myargdict[
'allowRename'] =
argBool(
True)
1415 myargdict[
'emptyStubFile'] =
argString(inputs[0])
1420 myDataDictionary = {
'BS_MRG_INPUT' :
argBSFile(inputs, type=self.
type, io=
'input'),
1421 'BS_MRG_OUTPUT' :
argBSFile(output, type=self.
type, io=
'output')}
1422 myMergeConf = executorConfig(myargdict, myDataDictionary)
1423 myMerger = bsMergeExecutor(name=
'BSMergeAthenaMP{0}{1}'.
format(self.
_subtype, counter), conf=myMergeConf, exe =
'file_merging',
1424 inData=
set([
'BS_MRG_INPUT']), outData=
set([
'BS_MRG_OUTPUT']))
1425 myMerger.doAll(input=
set([
'BS_MRG_INPUT']), output=
set([
'BS_MRG_OUTPUT']))
1429 for fname
in inputs:
1433 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1442 integrityFunction =
"returnIntegrityOfPOOLFile"
1448 from PyJobTransforms.trfValidateRootFile
import checkFile
1449 rc=
checkFile(fileName=fname, the_type=
'event', requireTree=
False)
1457 desc=super(argPOOLFile, self).prodsysDescription
1466 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1469 for fname
in inputs:
1470 if fname
not in self.
_value:
1472 "File {0} is not part of this agument: {1}".
format(fname, self))
1482 myDataDictionary = {
'POOL_MRG_INPUT' :
argPOOLFile(inputs, type=self.
type, io=
'input'),
1484 myMergeConf = executorConfig(myargdict, myDataDictionary)
1485 myMerger = athenaExecutor(name=
'POOLMergeAthenaMP{0}{1}'.
format(self.
_subtype, counter), conf=myMergeConf,
1486 skeletonCA =
'RecJobTransforms.MergePool_Skeleton',
1487 inData=
set([
'POOL_MRG_INPUT']), outData=
set([
'POOL_MRG_OUTPUT']),
1488 disableMT=
True, disableMP=
True)
1489 myMerger.doAll(input=
set([
'POOL_MRG_INPUT']), output=
set([
'POOL_MRG_OUTPUT']))
1493 for fname
in inputs:
1497 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1503 integrityFunction =
"returnIntegrityOfPOOLFile"
1507 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1510 for fname
in inputs:
1511 if fname
not in self.
_value:
1513 "File {0} is not part of this agument: {1}".
format(fname, self))
1516 mySubstepName =
'HITSMergeAthenaMP{0}'.
format(counter)
1520 myDataDictionary = {
'HITS' :
argHITSFile(inputs, type=self.
type, io=
'input'),
1522 myMergeConf = executorConfig(myargdict, myDataDictionary)
1523 myMerger = athenaExecutor(name = mySubstepName,
1524 skeletonCA =
'SimuJobTransforms.HITSMerge_Skeleton',
1526 inData=
set([
'HITS']), outData=
set([
'HITS_MRG']),
1527 disableMT=
False, disableMP=
True)
1528 myMerger.doAll(input=
set([
'HITS']), output=
set([
'HITS_MRG']))
1532 for fname
in inputs:
1536 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1543 integrityFunction =
"returnIntegrityOfPOOLFile"
1547 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1550 for fname
in inputs:
1551 if fname
not in self.
_value:
1553 "File {0} is not part of this agument: {1}".
format(fname, self))
1556 mySubstepName =
'EVNT_TRMergeAthenaMP{0}'.
format(counter)
1562 myMergeConf = executorConfig(myargdict, myDataDictionary)
1563 myMerger = athenaExecutor(name = mySubstepName, skeletonFile =
'SimuJobTransforms/skeleton.EVNT_TRMerge.py',
1565 inData=
set([
'EVNT_TR']), outData=
set([
'EVNT_TR_MRG']),
1566 disableMT=
False, disableMP=
True)
1567 myMerger.doAll(input=
set([
'EVNT_TR']), output=
set([
'EVNT_TR_MRG']))
1571 for fname
in inputs:
1575 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1582 integrityFunction =
"returnIntegrityOfPOOLFile"
1586 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1589 for fname
in inputs:
1590 if fname
not in self.
_value:
1592 "File {0} is not part of this agument: {1}".
format(fname, self))
1598 myDataDictionary = {
'RDO' :
argHITSFile(inputs, type=self.
type, io=
'input'),
1600 myMergeConf = executorConfig(myargdict, myDataDictionary)
1601 myMerger = athenaExecutor(name =
'RDOMergeAthenaMP{0}'.
format(counter),
1602 skeletonCA =
'SimuJobTransforms.RDOMerge_Skeleton',
1604 inData=
set([
'RDO']), outData=
set([
'RDO_MRG']),
1605 disableMT=
False, disableMP=
True)
1606 myMerger.doAll(input=
set([
'RDO']), output=
set([
'RDO_MRG']))
1610 for fname
in inputs:
1614 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1620 integrityFunction =
"returnIntegrityOfPOOLFile"
1624 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1627 for fname
in inputs:
1628 if fname
not in self.
_value:
1630 "File {0} is not part of this agument: {1}".
format(fname, self))
1633 mySubstepName =
'EVNTMergeAthenaMP{0}'.
format(counter)
1637 myDataDictionary = {
'EVNT' :
argEVNTFile(inputs, type=self.
type, io=
'input'),
1639 myMergeConf = executorConfig(myargdict, myDataDictionary)
1640 myMerger = athenaExecutor(name = mySubstepName, skeletonCA =
'EvgenJobTransforms.EVNTMerge_Skeleton',
1642 inData=
set([
'EVNT']), outData=
set([
'EVNT_MRG']),
1643 disableMT=
False, disableMP=
True)
1644 myMerger.doAll(input=
set([
'EVNT']), output=
set([
'EVNT_MRG']))
1648 for fname
in inputs:
1652 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1660 integrityFunction =
"returnIntegrityOfHISTFile"
1662 def __init__(self, value=list(), io =
'output', type=
None, subtype=
None, splitter=
',', runarg=
True, countable=
True, multipleOK =
None,
1663 name=
None, auxiliaryFile=
False):
1664 super(argHISTFile, self).
__init__(value=value, io=io, type=type, subtype=subtype, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1665 name=name, auxiliaryFile=auxiliaryFile)
1684 msg.error(
'Timeout counting events for {0}'.
format(fname))
1688 desc=super(argHISTFile, self).prodsysDescription
1696 integrityFunction =
"returnIntegrityOfNTUPFile"
1701 def __init__(self, value=list(), io =
'output', type=
None, subtype=
None, splitter=
',', treeNames=
None, runarg=
True, multipleOK =
None,
1702 name=
None, mergeTargetSize=-1, auxiliaryFile=
False):
1703 super(argNTUPFile, self).
__init__(value=value, io=io, type=type, subtype=subtype, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1704 name=name, mergeTargetSize=mergeTargetSize, auxiliaryFile=auxiliaryFile)
1713 if name
and 'NTUP_PILEUP' in name:
1719 msg.debug(
'Retrieving event count for NTUP files {0}'.
format(files))
1724 if myPRWEntries
is not None:
1726 if self.
name and 'NTUP_PILEUP' in self.
name:
1727 myPRWEntries =
PRWEntries(fileName=fname, integral=
True)
1732 if myPHYSVALEntries
is not None:
1734 if self.
name and 'NTUP_PHYSVAL' in self.
name:
1736 self.
_fileMetadata[fname][
'sumOfWeights'] = myPHYSVALEntries
1738 msg.debug(
'treeNames is set to None - event count undefined for this NTUP')
1745 msg.error(
'Timeout counting events for {0}'.
format(fname))
1750 from PyJobTransforms.trfValidateRootFile
import checkFile
1751 rc=
checkFile(fileName=fname, the_type=
'basket', requireTree=
False)
1759 msg.debug(
'selfMerge attempted for {0} -> {1} with {2}'.
format(inputs, output, argdict))
1762 for fname
in inputs:
1763 if fname
not in self.
_value:
1765 "File {0} is not part of this agument: {1}".
format(fname, self))
1773 myDataDictionary = {
'NTUP_MRG_INPUT' :
argNTUPFile(inputs, type=self.
type, io=
'input'),
1775 myMergeConf = executorConfig(myargdict, myDataDictionary)
1776 myMerger = NTUPMergeExecutor(name=
'NTUPMergeAthenaMP{0}{1}'.
format(self.
_subtype, counter), conf=myMergeConf,
1777 inData=
set([
'NTUP_MRG_INPUT']), outData=
set([
'NTUP_MRG_OUTPUT']))
1778 myMerger.doAll(input=
set([
'NTUP_MRG_INPUT']), output=
set([
'NYUP_MRG_OUTPUT']))
1782 for fname
in inputs:
1786 msg.debug(
'Post self-merge files are: {0}'.
format(self.
_value))
1792 desc=super(argNTUPFile, self).prodsysDescription
1803 f = bz2.BZ2File(fname,
'r')
1805 chunk = len(f.read(1024*1024))
1806 msg.debug(
'Read {0} bytes from {1}'.
format(chunk, fname))
1811 except OSError
as e:
1812 msg.error(
'Got exception {0!s} raised while checking integrity of file {1}'.
format(e, fname))
1818 desc=super(argBZ2File, self).prodsysDescription
1824 def __init__(self, value=list(), io =
'output', type=
None, splitter=
',', runarg=
True, multipleOK=
None, name=
None):
1825 super(argFTKIPFile, self).
__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1835 f = bz2.BZ2File(fname,
'r')
1837 if line.startswith(
'F'):
1840 except OSError
as e:
1841 msg.error(
'Event count for file {0} failed: {1!s}'.
format(fname, e))
1846 desc=super(argFTKIPFile, self).prodsysDescription
1852 def __init__(self, value=list(), io =
'output', type=
'txt_evt', splitter=
',', runarg=
True, multipleOK=
None, name=
None):
1853 super(argHepEvtAsciiFile, self).
__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg,
1854 multipleOK=multipleOK, name=name)
1864 tar = tarfile.open(fname,
"r:gz")
1865 for untar
in tar.getmembers():
1866 fileTXT = tar.extractfile(untar)
1867 if fileTXT
is not None:
1869 for aline
in fileTXT:
1870 if aline.startswith(b
'E '):
1873 except OSError
as e:
1874 msg.error(
'Event count for file {0} failed: {1!s}'.
format(fname, e))
1879 def __init__(self, value=list(), io =
'output', type=
None, splitter=
',', runarg=
True, multipleOK=
None, name=
None):
1880 super(argLHEFile, self).
__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1886 'lheSumOfNegWeights': 0,
1890 msg.debug(
'Retrieving event count for LHE file {0}'.
format(files))
1895 tar = tarfile.open(fname,
"r:gz")
1897 for untar
in tar.getmembers():
1898 fileTXT = tar.extractfile(untar)
1899 if fileTXT
is not None :
1900 lines = fileTXT.read().
decode(
"utf-8")
1901 lhecount = lines.count(
'/event')
1905 msg.debug(
'Entries is set to None - event count undefined for this LHE')
1909 msg.debug(
'Retrieving weight count for LHE file {0}'.
format(files))
1917 tar = tarfile.open(fname,
"r:gz")
1918 for untar
in tar.getmembers():
1919 fileTXT = tar.extractfile(untar)
1921 if fileTXT
is not None :
1922 lines = fileTXT.readlines()
1926 w =
float(re.sub(
' +',
' ',line).
split(
" ")[2])
1927 if w > 0 : weightPos += w
1928 else : weightNeg += abs(w)
1932 if "<event" in line :
1938 msg.debug(
'Entries is set to None - negative fraction count undefined for this LHE')
1939 self.
_fileMetadata[fname][
'lheSumOfPosWeights'] =
'UNDEFINED'
1940 self.
_fileMetadata[fname][
'lheSumOfNegWeights'] =
'UNDEFINED'
1951 def __init__(self, value = None, runarg = True, name = None, defaultSubstep = 'all', separator = ':
'):
1954 super(argSubstep, self).__init__(value, runarg, name)
1964 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
1967 elif isinstance(value, str):
1969 elif isinstance(value, (list, tuple)):
1973 if not isinstance(item, str):
1976 elif isinstance(value, dict):
1988 subStepMatch = re.match(
r'([a-zA-Z0-9,]+)' + self.
_separator +
r'(.*)', string)
1991 subStep = subStepMatch.group(1).
split(
',')
1992 subStepValue = subStepMatch.group(2)
1995 subStepValue = string
1996 msg.debug(
'Parsed {0} as substep {1}, argument {2}'.
format(string, subStep, subStepValue))
1997 for step
in subStep:
1998 subStepList.append((step, subStepValue))
2010 substep = exe.substep
2011 first = exe.conf.firstExecutor
2018 value = self.
_value[name]
2019 elif substep
in self.
_value:
2020 value = self.
_value[substep]
2021 elif first
and 'first' in self.
_value:
2022 value = self.
_value[
'first']
2023 elif 'default' in self.
_value:
2024 value = self.
_value[
'default']
2036 value = self.
_value[
'all']
2037 elif isinstance(value, list):
2038 value = self.
_value[
'all'] + value
2040 msg.debug(
'From substep argument {myvalue} picked value "{value}" for {name}, {substep}, first={first}'.
format(myvalue=self.
_value, value=value, name=name, substep=substep, first=first))
2046 desc = {
'type':
'substep',
'substeptype':
'str',
'separator': self.
_separator,
2061 def __init__(self, value = None, runarg = True, name = None, defaultSubstep = 'all', splitter = None, separator=':
'):
2063 super(argSubstepList, self).__init__(value, runarg, name, defaultSubstep, separator)
2073 desc = {
'type':
'substep',
'substeptype':
'list',
'listtype':
'str',
2079 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
2082 elif isinstance(value, str):
2084 elif isinstance(value, (list, tuple)):
2088 if not isinstance(item, str):
2091 for subStep
in subStepList:
2092 if subStep[0]
in self.
_value:
2095 self.
_value[subStep[0]] = subStep[1]
2096 elif isinstance(value, dict):
2097 for k, v
in value.items():
2098 if not isinstance(k, str):
2100 if not isinstance(v, list):
2111 subStepList = [(s[0], s[1].
split(self.
_splitter))
for s
in subStepList]
2113 subStepList = [(s[0], [s[1]])
for s
in subStepList]
2126 desc = {
'type':
'substep',
'substeptype':
'str',
'separator': self.
_separator,
2132 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
2135 elif isinstance(value, str):
2137 self.
_value = dict([(subStep[0], subStep[1])
for subStep
in subStepList])
2138 elif isinstance(value, (list, tuple)):
2142 if not isinstance(item, str):
2145 for subStep
in subStepList:
2146 self.
_value[subStep[0]] = subStep[1]
2147 elif isinstance(value, dict):
2148 for k, v
in value.items():
2149 if not isinstance(k, str):
2151 if not isinstance(v, str):
2167 desc = {
'type':
'substep',
'substeptype':
'bool',
'separator': self.
_separator,
2173 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1})'.
format(value,
type(value)))
2176 elif isinstance(value, bool):
2178 elif isinstance(value, str):
2180 self.
_value = dict([(subStep[0],
strToBool(subStep[1]))
for subStep
in subStepList])
2181 elif isinstance(value, (list, tuple)):
2185 if not isinstance(item, str):
2188 for subStep
in subStepList:
2190 elif isinstance(value, dict):
2191 for k, v
in value.items():
2192 if not isinstance(k, str):
2194 if not isinstance(v, bool):
2211 desc = {
'type':
'substep',
'substeptype':
'int',
'separator': self.
_separator,
2217 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
2221 elif isinstance(value, int):
2223 elif isinstance(value, str):
2225 self.
_value = dict([(subStep[0],
int(subStep[1]))
for subStep
in subStepList])
2226 elif isinstance(value, (list, tuple)):
2230 if not isinstance(item, str):
2233 for subStep
in subStepList:
2234 self.
_value[subStep[0]] =
int(subStep[1])
2235 elif isinstance(value, dict):
2236 for k, v
in value.items():
2237 if not isinstance(k, str):
2239 if not isinstance(v, int):
2252 def __init__(self, value=None, min=None, max=None, runarg=True, name=None):
2255 super(argSubstepFloat, self).
__init__(value = value, runarg = runarg, name=name)
2259 desc = {
'type':
'substep',
'substeptype':
'float',
'separator': self.
_separator,
2262 desc[
'min'] = self.
_min
2264 desc[
'max'] = self.
_max
2275 msg.debug(
'Attempting to set argSubstep from {0!s} (type {1}'.
format(value,
type(value)))
2279 elif isinstance(value, float):
2281 elif isinstance(value, str):
2283 self.
_value = dict([(subStep[0],
float(subStep[1]))
for subStep
in subStepList])
2284 elif isinstance(value, (list, tuple)):
2288 if not isinstance(item, str):
2290 'Failed to convert list item {0!s} to substep (should be a string)'.
format(item))
2292 for subStep
in subStepList:
2294 elif isinstance(value, dict):
2295 for k, v
in value.items():
2296 if not isinstance(k, str):
2298 'Dictionary key {0!s} for substep is not a string'.
format(k))
2299 if not isinstance(v, float):
2301 'Dictionary value {0!s} for substep is not an float'.
format(v))
2305 'Setter value {0!s} (type {1}) for substep argument cannot be parsed'.
format(value,
type(value)))
2308 if (self.
_min is not None and my_float < self.
_min)
or (self.
_max is not None and my_float > self.
_max):
2310 'argFloat value out of range: {0} is not between {1} and {2}'.
format(my_float, self.
_min, self.
_max))
2311 except ValueError
as e:
2313 'Failed to convert substep value {0} to float: {1}'.
format(value, e))
2329 'doRDO_TRIG': {
'RAWtoALL': [(
'in',
'-',
'RDO'), (
'in',
'+',
'RDO_TRIG'), (
'in',
'-',
'BS')]},
2330 'doOverlay': {
'HITtoRDO': [(
'in',
'-',
'HITS'), (
'out',
'-',
'RDO'), (
'out',
'-',
'RDO_FILT')],
2331 'Overlay': [(
'in',
'+', (
'HITS',
'RDO_BKG')), (
'out',
'+',
'RDO')]},
2332 'doFCtoDAOD': {
'Derivation': [(
'in',
'-',
'EVNT')]},
2333 'afterburn': {
'generate': [(
'out',
'-',
'EVNT')]},
2351 desc = {
'type':
'substep',
'substeptype':
'steering',
'listtype':
'str',
'separator': self.
_separator,
2360 msg.debug(
'Attempting to set argSubstepSteering from {0!s} (type {1})'.
format(value,
type(value)))
2364 elif isinstance(value, dict):
2366 for k, v
in value.items():
2367 if not isinstance(k, str)
or not isinstance(v, list):
2369 'Failed to convert dict {0!s} to argSubstepSteering'.
format(value))
2371 if not isinstance(subv, (list, tuple))
or len(subv) != 3
or subv[0]
not in (
'in',
'out')
or subv[1]
not in (
'+',
'-'):
2373 'Failed to convert dict {0!s} to argSubstepSteering'.
format(value))
2379 self.
_dumpvalue = getattr(self,
"_dumpvalue", value)
2380 elif isinstance(value, (str, list, tuple)):
2381 if isinstance(value, str):
2383 self.
_dumpvalue = getattr(self,
"_dumpvalue", value)
2387 if not isinstance(item, str):
2389 'Failed to convert list item {0!s} to substep (should be a string)'.
format(item))
2390 if item
in argSubstepSteering.steeringAlises:
2391 msg.debug(
"Found value {0} in steeringAlises ({1})".
format(item, argSubstepSteering.steeringAlises[item]))
2392 for substep, steerlist
in argSubstepSteering.steeringAlises[item].
items():
2393 if substep
in self.
_value:
2396 self.
_value[substep] = steerlist
2402 'Setter value {0!s} (type {1}) for substep argument cannot be parsed'.
format(value,
type(value)))
2405 if string
in argSubstepSteering.steeringAlises:
2406 return argSubstepSteering.steeringAlises[string]
2410 for subvalue
in ivalue.split(
','):
2411 matchedParts = re.match(
r'(in|out)(\+|\-)([A-Z_]+)$', subvalue)
2412 if not matchedParts:
2414 'Failed to convert string {0!s} to argSubstepSteering'.
format(subvalue))
2415 retvalue.append((matchedParts.group(1), matchedParts.group(2), matchedParts.group(3)))
2427 msg.debug(
'Attempting to set argSubstepConditions from {0!s} (type {1}'.
format(value,
type(value)))
2429 super(self.__class__, self.__class__).value.fset(self, value)
2433 if "CurrentMC" == v:
2439 cmd =
"COMAGetGlobalTagNameByCurrentState --state=CurrentMC"
2440 return str(client.execute(cmd, format =
'dom_object').get_rows().pop()[
'globalTag'])
2444 desc = {
'type':
'substep',
'substeptype':
'str',
'separator': self.
_separator,
2462 super(trfArgParser, self).
__init__(*args, **kwargs)
2465 argName = args[0].lstrip(
'-')
2466 msg.debug(
'Found arg name {0}'.
format(argName))
2472 'Transform arguments may not use hyphens (use camelCase or underscore')
2477 msg.debug(
'Double definition of argument {0} - ignored'.
format(argName))
2481 if 'help' in kwargs:
2485 if 'action' in kwargs
and 'factory' in dir(kwargs[
'action']):
2486 self.
_argClass[argName] = kwargs[
'action'].factory
2487 elif 'type' in kwargs:
2488 self.
_argClass[argName] = kwargs[
'type']
2494 for arg
in (
'group',):
2496 strippedArgs[arg] = kwargs.pop(arg)
2500 for i
in range(1, len(args)):
2501 argAlias = args[i].lstrip(
'-')
2502 msg.debug(
'Adding an alias of {0}: {1}'.
format(argName, argAlias))
2506 if 'group' in strippedArgs:
2508 msg.debug(
'Adding argument to group {0}: ({1}; {2})'.
format(strippedArgs[
'group'], args, kwargs))
2512 msg.warning(
'Argument group {0} not defined - adding argument to main parser'.
format(strippedArgs[
'group']))
2513 msg.debug(
'Adding argument: ({0}; {1})'.
format(args, kwargs))
2514 super(trfArgParser, self).
add_argument(*args, **kwargs)
2516 msg.debug(
'Adding argument: ({0}; {1})'.
format(args, kwargs))
2517 super(trfArgParser, self).
add_argument(*args, **kwargs)
2523 msg.debug(
'Detected the local variable {0}'.
format(name))
2524 if argClass
is not None:
2525 desc[name] = argClass().prodsysDescription
2527 desc[name].update({
'help': self.
_helpString[name]})
2536 msg.warning(
'Argument group %s already exists', args[0])
2538 self.
_argGroups[args[0]] = self.add_argument_group(*args)
2551 keyArray = [
'--' +
str(key)
for key
in self.
_helpString if key
not in (
'h',
'verbose',
'loglevel',
'dumpargs',
'argdict') ]
2553 print(
'ListOfDefaultPositionalKeys={0}'.
format(keyArray))
2565 newValueObj = value[0]
2566 msg.debug(
'Started with: %s = %s',
type(newValueObj), newValueObj)
2567 if isinstance(value[0], argSubstep):
2570 elif isinstance(value[0], list):
2573 msg.debug(
'Handling a list of arguments for key')
2577 processedValueObj.value = processedValues
2578 newValues.append(processedValueObj)
2579 newValueObj = newValues
2580 return newValueObj, newValues
2581 elif isinstance(value[0].value, list):
2582 newValues = value[0].value
2583 elif isinstance(value[0].value, dict):
2584 newValues = value[0].value
2586 newValues = [value[0].value,]
2587 for valueObj
in value[1:]:
2588 msg.debug(
'Value Object: %s = %s',
type(valueObj), valueObj)
2589 if isinstance(value[0], argSubstep):
2592 elif isinstance(valueObj.value, list):
2594 newValues.extend(valueObj.value)
2595 elif isinstance(valueObj.value, dict):
2597 newValues.update(valueObj.value)
2599 newValues.append(valueObj.value)
2600 return newValueObj, newValues
2609 super(trfArgParser, self).
parse_args(args = args, namespace = namespace)
2611 namespace = super(trfArgParser, self).
parse_args(args = args)
2612 for k, v
in namespace.__dict__.items():
2613 msg.debug(
'Treating key %s (%s)', k, v)
2614 if isinstance(v, list):
2616 if not isinstance(newValueObj, list):
2617 newValueObj.value = newValues
2618 namespace.__dict__[k] = newValueObj
2619 msg.debug(
'Set to %s', newValues)
2627 msg.debug(
"converting string {string} to boolean".
format(string = string))
2628 if string.lower() ==
'false':
2630 elif string.lower() ==
'true':
2634 except AttributeError:
2648 allKeys =
set(dict1) |
set(dict2)
2652 if isinstance(
list(dict1.values())[0], list):
2654 elif len(dict2) > 0:
2655 if isinstance(
list(dict2.values())[0], list):
2659 mergeDict[key] = dict1.get(key, []) + dict2.get(key, [])
2662 if key
in dict1
and key
in dict2:
2664 if dict1[key] != dict2[key]:
2666 'Merging substep arguments found clashing values for substep {0}: {1}!={2}'.
format(key, dict1[key], dict2[key]))
2667 mergeDict[key] = dict1[key]
2669 mergeDict[key] = dict1[key]
2671 mergeDict[key] = dict2[key]