ATLAS Offline Software
Loading...
Searching...
No Matches
trfArgClasses.py
Go to the documentation of this file.
1# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2
6
7import argparse
8import bz2
9import copy
10import glob
11import os
12import re
13import subprocess
14import uuid
15
16import logging
17msg = logging.getLogger(__name__)
18
19import PyJobTransforms.trfExceptions as trfExceptions
20
21from PyJobTransforms.trfFileUtils import athFileInterestingKeys, AthenaLiteFileInfo, NTUPEntries, HISTEntries, PHYSVALEntries, PRWEntries, urlType, ROOTGetSize
22from PyJobTransforms.trfUtils import call
23from PyJobTransforms.trfExeStepTools import commonExecutorStepName
24from PyJobTransforms.trfExitCodes import trfExit as trfExit
25from PyJobTransforms.trfDecorators import timelimited
26from PyJobTransforms.trfAMI import getAMIClient
27
28
29
32 def __init__(self, genclass, *args, **kwargs):
33 msg.debug('Initialised class %s with args=%s; kwargs=%s', genclass, args, kwargs)
34 self._genclass = genclass
35 self._args = args
36 self._kwargs = kwargs
37
38 def __call__(self, valueString=None):
39 msg.debug('Called class %s with value=%s; args=%s; kwargs=%s', self._genclass, valueString, self._args, self._kwargs)
40
41 # Wrap this step in our own try/except because if this goes wrong we want to see the exception
42 # instead of having it masked by the argparse module
43 try:
44 # Passing None suppresses the value passed to the constructor, thus the constructor's own
45 # default value is used - generally this will match the default value for the underlying
46 # python object
47 if valueString is None:
48 obj = self._genclass(*self._args, **self._kwargs)
49 else:
50 obj = self._genclass(valueString, *self._args, **self._kwargs)
51 except Exception as e:
52 msg.fatal('Got this exception raised when calling object factory: {0}'.format(e))
53 raise
54 return obj
55
56 def __str__(self):
57 return 'argFactory for {0}, args {1}, kwargs {2}'.format(self._genclass, self._args, self._kwargs)
58
59
60
61class argAction(argparse.Action):
62 def __init__(self, factory, option_strings, dest, **kwargs):
63 self._factory = factory
64 super().__init__(option_strings, dest, **kwargs)
65
66 def __call__(self, parser, namespace, values, option_string=None):
67 msg.debug('Called action for factory=%s; values=%s', self._factory, values)
68
69 # call the factory for each value
70 if isinstance(values, list):
71 if not values:
72 # in case of empty list, run factory on None to get the default
73 setattr(namespace, self.dest, [self._factory(None)])
74 else:
75 setattr(namespace, self.dest, [self._factory(v) for v in values])
76 else:
77 setattr(namespace, self.dest, self._factory(values))
78
79
80
84 def __init__(self, genclass, *args, **kwargs):
85 msg.debug('Initialised action class %s with args=%s; kwargs=%s', genclass, args, kwargs)
86 self._factory = argFactory(genclass, *args, **kwargs)
87
88 @property
89 def factory(self):
90 return self._factory
91
92 def __call__(self, option_strings, dest, **kwargs):
93 return argAction(self._factory, option_strings, dest, **kwargs)
94
95 def __str__(self):
96 return 'argActionFactory for {0}'.format(self._factory)
97
98
99
103
104
109 def __init__(self, value = None, runarg = True, name = None):
110 self._runarg = runarg
111 self._name = name
112
113
118 self.value = value
119
120
122 @property
123 def value(self):
124 return self._value
125
126
128 @value.setter
129 def value(self, value):
130 self._value = value
131
132
133 @property
134 def isRunarg(self):
135 return self._runarg
136
137
138 @property
139 def name(self):
140 return self._name
141
142
143 @name.setter
144 def name(self, value):
145 self._name = value
146
147 @property
149 desc = {'type' : None}
150 return desc
151
152
153 def __str__(self):
154 return '{0}: Value {1} (isRunArg={2})'.format(self.__class__.__name__, self._value, self._runarg)
155
156
157 def __repr__(self):
158 return repr(self.value)
159
160
161 def __eq__(self,other):
162 return self.value == other.value
163
164 def __nq__(self, other):
165 return self.value != other.value
166
167 def __lt__(self, other):
168 return self.value < other.value
169
170 def __gt__(self, other):
171 return self.value > other.value
172
173
175
176
182 def __init__(self, value = None, runarg = True, name = None, choices = None):
183 self._choices = choices
184 super(argString, self).__init__(value = value, runarg = runarg, name=name)
185
186
188 @property
189 def value(self):
190 return self._value
191
192
194 @value.setter
195 def value(self, value):
196 if value is None:
197 # For strings, None maps to ''
198 self._value = ''
199 else:
200 # Call string converter - should work for everything...
201 self._value = str(value)
202 if self._choices:
203 if self._value not in self._choices:
204 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CHOICES_FAIL'), 'Converted argument {0} for {1} did not match any valid choice: {2}'.format(value, self._name, self._choices))
205
206
208 def choices(self):
209 return self._choices
210
211 # prodsysDescription: human readable from of type plus possible values
212 @property
214 desc = {'type' : 'str'}
215 if self._choices:
216 desc['choices'] = self._choices
217 return desc
218
219
220 def __str__(self):
221 return self.value
222
223
224
226
227
229 @property
230 def value(self):
231 return self._value
232
233
236 @value.setter
237 def value(self, value):
238 if value is None:
239 # For ints None maps to 0
240 self._value = 0
241 else:
242 if isinstance(value, int):
243 self._value = value
244 else:
245
246 try:
247 self._value = int(value)
248 except ValueError as e:
249 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
250 'Failed to convert value {0} to int: {1}'.format(value, e))
251
252 # prodsysDescription: human readable from of type plus possible values
253 @property
255 desc = {'type' : 'int'}
256 return desc
257
258
259
260
262
263
267 def __init__(self, value=None, min=None, max=None, runarg=True, name=None):
268 self._min = min
269 self._max = max
270 super(argFloat, self).__init__(value = value, runarg = runarg, name=name)
271
272
274 @property
275 def value(self):
276 return self._value
277
278 @property
280 desc = {'type' : 'float'}
281 if self._min:
282 desc['min'] = self._min
283 if self._max:
284 desc['max'] = self._max
285 return desc
286
287
292 @value.setter
293 def value(self, value=None):
294 # Default value will be 0.0 or self._min (if defined)
295 if value is None:
296 if self._min is not None:
297 self._value = self._min
298 else:
299 self._value = 0.0
300 else:
301 try:
302 if isinstance(value, float):
303 self._value = value
304 else:
305 self._value = float(value)
306 except ValueError:
307 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
308 'Failed to convert %s to a float' % str(value))
309
310 if (self._min is not None and self.value < self._min) or (self._max is not None and self._value > self._max):
311 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_OUT_OF_RANGE'),
312 'argFloat value out of range: %g is not between %s and %s' %
313 (self.value, self._min, self._max))
314
315
316
318
319
321 @property
322 def value(self):
323 return self._value
324
325
330 @value.setter
331 def value(self, value):
332 # Default value matches the python bool() constructor
333 if value is None:
334 self._value = False
335 else:
336 if isinstance(value, bool):
337 self._value = value
338 else:
339 self._value = strToBool(value)
340
341 # prodsysDescription: human readable from of type plus possible values
342 @property
344 desc = {'type' : 'bool'}
345 return desc
346
347
349
350
356 def __init__(self, value = [], supressEmptyStrings = True, splitter=',', runarg=True, name=None):
357 self._splitter = splitter
358 self._supressEmptyStrings = supressEmptyStrings
359
360 super(argList, self).__init__(value = value, runarg = runarg, name=name)
361
362
364 @property
365 def value(self):
366 return self._value
367
368 # prodsysDescription: human readable from of type plus possible values
369 @property
371 desc = {'type' : 'list', 'listtype': 'str'}
372 if self._supressEmptyStrings:
373 desc['supress Empty Strings'] = self._supressEmptyStrings
374 return desc
375
376
377
379 @value.setter
380 def value(self, value):
381 if isinstance(value, (list, tuple)):
382 self._value = list(value)
383 elif value is None:
384 self._value = []
385 return
386 else:
387 try:
388 if self._supressEmptyStrings:
389 self._value = [ v for v in value.split(self._splitter) if v != '' ]
390 else:
391 self._value = value.split(self._splitter)
392 except AttributeError:
393 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
394 'Failed to convert %s to a list' % str(value))
395
396
398 def append(self, addme):
399 self._value.append(addme)
400
401
404 def __str__(self):
405 return " ".join(self._value)
406
407
409 def __repr__(self):
410 return '[' + ','.join([ repr(s) for s in self._value ]) + ']'
411
412
413
415
417 @property
418 def value(self):
419 return self._value
420
421
422
426 @value.setter
427 def value(self, value):
428 if isinstance(value, list):
429 for v in value:
430 if not isinstance(v, int):
431 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'),
432 'Illegal value {0} in list of ints'.format(v))
433 self._value = value
434 elif value is None:
435 self._value = []
436 return
437 else:
438 try:
440 self._value = [ v for v in value.split(self._splitter) if v != '' ]
441 else:
442 self._value = value.split(self._splitter)
443 self._value = [ int(el) for el in self._value ]
444 except (AttributeError, ValueError):
445 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
446 'Failed to convert %s to a list of ints' % str(value))
447
448 def __str__(self):
449 return " ".join([ str(el) for el in self._value ])
450
451 @property
453 desc = {'type' : 'list', 'listtype' : 'int'}
454 return desc
455
456
457# Special list which stores k:v pairs, where the value is an float (used for AthenaMP merge target size)
459
465 def __init__(self, value = {}, supressEmptyStrings = True, splitter=',', kvsplitter=":", runarg=True, name=None):
466 self._splitter = splitter
467 self._kvsplitter = kvsplitter
468 self._supressEmptyStrings = supressEmptyStrings
469
470 super(argList, self).__init__(value = value, runarg = runarg, name=name)
471
472
474 @property
475 def value(self):
476 return self._value
477
478
483 @value.setter
484 def value(self, value):
485 if isinstance(value, dict):
486 for k, v in value.items():
487 if not isinstance(k, str):
488 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'),
489 'Illegal key argument type {0} in dictionary for argKeyFloatValueList'.format(k))
490 if not isinstance(v, float):
491 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'),
492 'Illegal value argument type {0} in dictionary for argKeyFloatValueList'.format(v))
493 self._value = value
494 elif value is None:
495 self._value = {}
496 return
497 else:
498 self._value = {}
499 try:
500 if self._supressEmptyStrings:
501 kvlist = [ v for v in value.split(self._splitter) if v != '' ]
502 else:
503 kvlist = value.split(self._splitter)
504 for item in kvlist:
505 k, v = item.split(self._kvsplitter, 1)
506 self._value[k] = float(v)
507 except (AttributeError, ValueError):
508 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
509 'Failed to convert {0} to a dictionary of string:int values'.format(value))
510
511 def __str__(self):
512 return str(self._value)
513
514 @property
516 desc = {'type' : 'list', 'listtype' : 'str:float'}
517 return desc
518
519
520
523
524
544 def __init__(self, value=list(), type=None, subtype=None, io = 'output', splitter=',', runarg=True, guid=None,
545 multipleOK = None, name=None, executor=list(), mergeTargetSize=-1, auxiliaryFile=False):
546 # Set these values before invoking super().__init__ to make sure they can be
547 # accessed in our setter
548 self._dataset = None
549 self._urlType = None
550 self._type = type
551 self._subtype = subtype
552 self._guid = guid
553 self._mergeTargetSize = mergeTargetSize
554 self._auxiliaryFile = auxiliaryFile
555 self._originalName = None
556
557 # User setter to get valid value check
558 self.io = io
559
560 self._exe = executor
561
562
569
570 self._metadataKeys = {'file_size': self._getSize,
571 'integrity': self._getIntegrity,
572 'file_guid': self._generateGUID,
573 '_exists': self._exists,
574 }
576 if multipleOK is None:
577 if self._io == 'input':
578 self._multipleOK = True
579 else:
580 self._multipleOK = False
581 else:
582 self._multipleOK = multipleOK
583
584
585 super(argFile, self).__init__(value=value, splitter=splitter, runarg=runarg, name=name)
586
587
588
590 @property
591 def value(self):
592 return self._value
593
594
596 @value.setter
597 def value(self, value):
598 self.valueSetter(value)
599
600
602 @property
603 def multipleOK(self):
604 return self._multipleOK
605
606
607 @multipleOK.setter
608 def multipleOK(self, value):
609 self._multipleOK = value
610
611
612 @property
614 return self._mergeTargetSize
615
616
617 @mergeTargetSize.setter
618 def mergeTargetSize(self, value):
619 if value is None:
620 self._mergeTargetSize = 0
621 else:
622 self._mergeTargetSize = value
623
624 @property
626 if isinstance(self._type, dict):
627 if self._type=={}:
628 desc = {'type' : 'file', 'subtype' : "NONE" }
629 else:
630 desc = {'type' : 'file', 'subtype' : dict((str(k).upper(), str(v).upper()) for (k,v) in self._type.items())}
631 else:
632 desc = {'type' : 'file', 'subtype' : str(self._type).upper()}
633 desc['multiple'] = self._multipleOK
634 return desc
635
636
637 @property
638 def executor(self):
639 return self._exe
640
641
645 def valueSetter(self, value):
646
647 if isinstance(value, (list, tuple)):
648 if len(value) > 0 and isinstance(value[0], dict): # Tier-0 style expanded argument with metadata
649 self._value=[]
650 for myfile in value:
651 try:
652 self._value.append(myfile['lfn'])
653 self._resetMetadata(files = [myfile['lfn']])
654 except KeyError:
655 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
656 'Filename (key "lfn") not found in Tier-0 file dictionary: {0}'.format(myfile))
657 for k, v in myfile.items():
658 if k == 'guid':
659 self._setMetadata([myfile['lfn']], {'file_guid': v})
660 elif k == 'events':
661 self._setMetadata([myfile['lfn']], {'nentries': v})
662 elif k == 'checksum':
663 self._setMetadata([myfile['lfn']], {'checksum': v})
664 elif k == 'dsn':
665 if not self._dataset:
666 self.dataset = v
667 elif self.dataset != v:
668 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_DATASET'),
669 'Inconsistent dataset names in Tier-0 dictionary: {0} != {1}'.format(self.dataset, v))
670 else:
671 self._value = list(value)
672 self._getDatasetFromFilename(reset = False)
673 self._resetMetadata()
674 elif value is None:
675 self._value = []
676 return
677 else:
678 try:
679 if value.lower().startswith('lfn'):
680 # Resolve physical filename using pool file catalog.
681 from PyUtils.PoolFile import file_name
682 protocol, pfn = file_name(value)
683 self._value = [pfn]
684 self._getDatasetFromFilename(reset = False)
685 self._resetMetadata()
686 else:
687 # Don't split output filename if it contains a list in square brackets
688 if self._io == 'output' and ('[' in value) and (']' in value):
689 self._value = [value]
690 else:
691 self._value = value.split(self._splitter)
692 self._getDatasetFromFilename(reset = False)
693 self._resetMetadata()
694 except (AttributeError, TypeError):
695 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
696 'Failed to convert %s to a list' % str(value))
697
698
699 deDuplicatedValue = []
700 for fname in self._value:
701 if fname not in deDuplicatedValue:
702 deDuplicatedValue.append(fname)
703 else:
704 msg.warning("Removing duplicated file {0} from file list".format(fname))
705 if len(self._value) != len(deDuplicatedValue):
706 self._value = deDuplicatedValue
707 msg.warning('File list after duplicate removal: {0}'.format(self._value))
708
709 # Find our URL type (if we actually have files!)
710 # At the moment this is assumed to be the same for all files in this instance
711 # although in principle one could mix different access methods in the one input file type
712 if len(self._value) > 0:
713 self._urlType = urlType(self._value[0])
714 else:
715 self._urlType = None
716
717
718 if self._io == 'input':
719
723 if self._urlType == 'posix':
724 msg.debug('Found POSIX filesystem input - activating globbing')
725 newValue = []
726 for filename in self._value:
727 # Simple case
728 globbedFiles = glob.glob(filename)
729 if len(globbedFiles) == 0: # No files globbed for this 'filename' argument.
730 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
731 'Input file argument {0} globbed to NO input files - probably the file(s) are missing'.format(filename))
732
733 globbedFiles.sort()
734 newValue.extend(globbedFiles)
735
736 self._value = newValue
737 msg.debug ('File input is globbed to %s' % self._value)
738
739 elif self._urlType == 'root':
740 msg.debug('Found root filesystem input - activating globbing')
741 newValue = []
742 for filename in self._value:
743 if str(filename).startswith("root"):
744 msg.debug('Found input file name starting with "root," setting XRD_RUNFORKHANDLER=1, which enables fork handlers for xrootd in direct I/O')
745 os.environ["XRD_RUNFORKHANDLER"] = "1"
746 if str(filename).startswith("https") or str(filename).startswith("davs") or not(str(filename).endswith('/')) and '*' not in filename and '?' not in filename:
747 msg.debug('Seems that only one file was given: {0}'.format(filename))
748 newValue.extend(([filename]))
749 else:
750 # Hopefully this recognised wildcards...
751 path = filename
752 fileMask = ''
753 if '*' in filename or '?' in filename:
754 msg.debug('Split input into path for listdir() and a filemask to select available files.')
755 path = filename[0:filename.rfind('/')+1]
756 msg.debug('path: {0}'.format(path))
757 fileMask = filename[filename.rfind('/')+1:len(filename)]
758 msg.debug('Will select according to: {0}'.format(fileMask))
759
760 cmd = ['/afs/cern.ch/project/eos/installation/atlas/bin/eos.select' ]
761 if not os.access ('/afs/cern.ch/project/eos/installation/atlas/bin/eos.select', os.X_OK ):
762 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
763 'No execute access to "eos.select" - could not glob EOS input files.')
764
765 cmd.extend(['ls'])
766 cmd.extend([path])
767
768 myFiles = []
769 try:
770 proc = subprocess.Popen(args = cmd,bufsize = 1, shell = False, stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
771 rc = proc.wait()
772 output = proc.stdout.readlines()
773 if rc!=0:
774 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
775 'EOS list command ("{0!s}") failed: rc {1}, output {2}'.format(cmd, rc, output))
776 msg.debug("eos returned: {0}".format(output))
777 for line in output:
778 if "root" in line:
779 myFiles += [str(path)+str(line.rstrip('\n'))]
780
781 patt = re.compile(fileMask.replace('*','.*').replace('?','.'))
782 for srmFile in myFiles:
783 if fileMask != '':
784 if(patt.search(srmFile)) is not None:
785 #if fnmatch.fnmatch(srmFile, fileMask):
786 msg.debug('match: %s',srmFile)
787 newValue.extend(([srmFile]))
788 else:
789 newValue.extend(([srmFile]))
790
791 msg.debug('Selected files: %s', newValue)
792 except (AttributeError, TypeError, OSError):
793 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_RUNTIME_ERROR'),
794 'Failed to convert %s to a list' % str(value))
795 if len(self._value) > 0 and len(newValue) == 0:
796 # Woops - no files!
797 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_INPUT_FILE_ERROR'),
798 'Input file argument(s) {0!s} globbed to NO input files - ls command failed')
799 self._value = newValue
800 msg.debug ('File input is globbed to %s' % self._value)
801 # Check if multiple outputs are ok for this object
802 elif self._multipleOK is False and len(self._value) > 1:
803 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
804 'Multiple file arguments are not supported for {0} (was given: {1}'.format(self, self._value))
805
806 @property
807 def io(self):
808 return (self._io)
809
810 @io.setter
811 def io(self, value):
812 if value not in ('input', 'output', 'temporary'):
813 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_RUNTIME_ERROR'),
814 'File arguments must be specified as input, output or temporary - got {0}'.format(value))
815 self._io = value
816
817 @property
818 def dataset(self):
819 return self._dataset
820
821 @dataset.setter
822 def dataset(self, value):
823 self._dataset = value
824
825 @property
826 def orignalName(self):
827 return self._originalName
828
829 @orignalName.setter
830 def originalName(self, value):
831 self._originalName = value
832
833 @property
834 def type(self):
835 return self._type
836
837 @type.setter
838 def type(self, value):
839 self._type = value
840
841 @property
842 def subtype(self):
843 return self._subtype
844
845 @subtype.setter
846 def subtype(self, value):
847 self._subtype = value
848
849
850 @property
851 def name(self):
852 return self._name
853
854
858 @name.setter
859 def name(self, value):
860 self._name = value
861 m = re.match(r'(input|output|tmp.)([A-Za-z0-9_]+?)(File)?$', value)
862 if m:
863 msg.debug("ArgFile name setter matched this: {0}".format(m.groups()))
864 if self._type is None:
865 dtype = m.group(2).split('_', 1)[0]
866 # But DRAW/DESD/DAOD are really just RAW, ESD, AOD in format
867 if re.match(r'D(RAW|ESD|AOD)', dtype):
868 dtype = dtype[1:]
869 msg.debug("Autoset data type to {0}".format(dtype))
870 self._type = dtype
871 if self._subtype is None:
872 msg.debug("Autoset data subtype to {0}".format(m.group(2)))
873 self._subtype = m.group(2)
874 else:
875 msg.debug("ArgFile name setter did not match against '{0}'".format(value))
876
877 @property
878 def auxiliaryFile(self):
879 return self._auxiliaryFile
880
881
883 @property
884 def metadata(self):
885 self.getMetadata()
886 return self._fileMetadata
887
888
889 @property
890 def nentries(self):
891 return self.getnentries()
892
893
894 def getnentries(self, fast=False):
895 totalEvents = 0
896 for fname in self._value:
897 events = self.getSingleMetadata(fname=fname, metadataKey='nentries', populate = not fast)
898 if events is None:
899 msg.debug('Got events=None for file {0} - returning None for this instance'.format(fname))
900 return None
901 if events == 'UNDEFINED':
902 msg.debug('Got events=UNDEFINED for file {0} - returning UNDEFINED for this instance'.format(fname))
903 return 'UNDEFINED'
904 if not isinstance(events, int):
905 msg.warning('Got unexpected events metadata for file {0}: {1!s} - returning None for this instance'.format(fname, events))
906 return None
907 totalEvents += events
908
909 return totalEvents
910
911
912
917 def _resetMetadata(self, files=[]):
918 if files == [] or '_fileMetadata' not in dir(self):
919 self._fileMetadata = {}
920 for fname in self.value:
921 self._fileMetadata[fname] = {}
922 else:
923 for fname in files:
924 if fname in self.value:
925 self._fileMetadata[fname] = {}
926 elif fname in self._fileMetadata:
927 del self._fileMetadata[fname]
928 msg.debug('Metadata dictionary now {0}'.format(self._fileMetadata))
929
930 # If we have the special guid option, then manually try to set GUIDs we find
931 if self._guid is not None:
932 msg.debug('Now trying to set file GUID metadata using {0}'.format(self._guid))
933 for fname, guid in self._guid.items():
934 if fname in self._value:
935 self._fileMetadata[fname]['file_guid'] = guid
936 else:
937 msg.warning('Explicit GUID {0} was passed for file {1}, but this file is not a member of this instance'.format(guid, fname))
938
939
945 def getMetadata(self, files = None, metadataKeys = None, maskMetadataKeys = None, populate = True, flush = False):
946 # Normalise the files and keys parameter
947 if files is None:
948 files = self._value
949 elif isinstance(files, str):
950 files = (files,)
951 msg.debug('getMetadata will examine these files: {0!s}'.format(files))
952
953 if metadataKeys is None:
954 metadataKeys = list(self._metadataKeys)
955 elif isinstance(metadataKeys, str):
956 metadataKeys = [metadataKeys,]
957 if maskMetadataKeys is not None:
958 metadataKeys = [k for k in metadataKeys if k not in maskMetadataKeys]
959 msg.debug('getMetadata will retrieve these keys: {0!s}'.format(metadataKeys))
960
961 if flush is True:
962 msg.debug('Flushing cached metadata values')
963 self._resetMetadata()
964
965 if populate is True:
966 msg.debug('Checking metadata values')
967 self._readMetadata(files, metadataKeys)
968
969 metadata = {}
970 for fname in files:
971 metadata[fname] = {}
972 for mdkey in metadataKeys:
973 try:
974 metadata[fname][mdkey] = self._fileMetadata[fname][mdkey]
975 except KeyError:
976 # This should not happen, unless we skipped populating
977 if populate:
978 msg.error('Did not find metadata key {0!s} for file {1!s} - setting to None'.format(mdkey, fname))
979 metadata[fname][mdkey] = None
980 return metadata
981
982
989 def getSingleMetadata(self, fname, metadataKey, populate = True, flush = False):
990 if not (isinstance(fname, str) and isinstance(metadataKey, str)):
991 raise trfExceptions.TransformInternalException(trfExit.nameToCode('TRF_INTERNAL'),
992 'Illegal call to getSingleMetadata function: {0!s} {1!s}'.format(fname, metadataKey))
993 md = self.getMetadata(files = fname, metadataKeys = metadataKey, populate = populate, flush = flush)
994 return md[fname][metadataKey]
995
996
997
1000 def _readMetadata(self, files, metadataKeys):
1001 msg.debug('Retrieving metadata keys {1!s} for files {0!s}'.format(files, metadataKeys))
1002 for fname in files:
1003 if fname not in self._fileMetadata:
1004 self._fileMetadata[fname] = {}
1005 for fname in files:
1006 # Always try for a simple existence test first before producing misleading error messages
1007 # from metadata populator functions
1008 if '_exists' not in self._fileMetadata[fname]:
1009 self._metadataKeys['_exists'](files)
1010 if self._fileMetadata[fname]['_exists'] is False:
1011 # N.B. A log ERROR message has printed by the existence test, so do not repeat that news here
1012 for key in metadataKeys:
1013 if key != '_exists':
1014 self._fileMetadata[fname][key] = None
1015 else:
1016 # OK, file seems to exist at least...
1017 for key in metadataKeys:
1018 if key not in self._metadataKeys:
1019 msg.debug('Metadata key {0} is unknown for {1}'.format(key, self.__class__.__name__))
1020 self._fileMetadata[fname][key] = 'UNDEFINED'
1021 else:
1022 if key in self._fileMetadata[fname]:
1023 msg.debug('Found cached value for {0}:{1} = {2!s}'.format(fname, key, self._fileMetadata[fname][key]))
1024 else:
1025 msg.debug('No cached value for {0}:{1}. Calling generator function {2} ({3})'.format(fname, key, self._metadataKeys[key].__name__, self._metadataKeys[key]))
1026 try:
1027 # For efficiency call this routine with all files we have
1028 msg.info("Metadata generator called to obtain {0} for {1}".format(key, files))
1029 self._metadataKeys[key](files)
1031 msg.error('Calling {0!s} raised an exception: {1!s}'.format(self._metadataKeys[key].__name__, e))
1032 if key not in self._fileMetadata[fname]:
1033 msg.warning('Call to function {0} for {1} file {2} failed to populate metadata key {3}'.format(self._metadataKeys[key].__name__, self.__class__.__name__, fname, key))
1034 self._fileMetadata[fname][key] = None
1035 msg.debug('Now have {0}:{1} = {2!s}'.format(fname, key, self._fileMetadata[fname][key]))
1036
1037
1038
1048 def _setMetadata(self, files=None, metadataKeys={}):
1049 if files is None:
1050 files = self._value
1051 for fname in files:
1052 if fname not in self._fileMetadata:
1053 self._fileMetadata[fname] = {}
1054 for k, v in metadataKeys.items():
1055 msg.debug('Manualy setting {0} for file {1} to {2}'.format(k, fname, v))
1056 self._fileMetadata[fname][k] = v
1057
1058
1059
1065 def isCached(self, files = None, metadataKeys = None):
1066 msg.debug('Testing for cached values for files {0} and keys {1}'.format(files, metadataKeys))
1067 if files is None:
1068 files = self._value
1069 elif isinstance(files, str):
1070 files = (files,)
1071 if metadataKeys is None:
1072 metadataKeys = list(self._metadataKeys)
1073 elif isinstance(metadataKeys, str):
1074 metadataKeys = (metadataKeys,)
1075
1076 isCachedFlag = True
1077 for fname in files:
1078 for key in metadataKeys:
1079 if key not in self._fileMetadata[fname]:
1080 isCachedFlag = False
1081 break
1082 if isCachedFlag is False:
1083 break
1084
1085 return isCachedFlag
1086
1087
1093 def _getDatasetFromFilename(self, reset = False):
1094 if reset:
1095 self._dataset = None
1096 newValue = []
1097 for filename in self._value:
1098 if filename.find('#') > -1:
1099 (dataset, fname) = filename.split('#', 1)
1100 newValue.append(fname)
1101 msg.debug('Current dataset: {0}; New dataset {1}'.format(self._dataset, dataset))
1102 if self._dataset and (self._dataset != dataset):
1103 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_DATASET'),
1104 'Found inconsistent dataset assignment in argFile setup: %s != %s' % (self._dataset, dataset))
1105 self._dataset = dataset
1106 if len(newValue) == 0:
1107 return
1108 elif len(newValue) != len (self._value):
1109 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_DATASET'),
1110 'Found partial dataset assignment in argFile setup from {0} (dsn#lfn notation must be uniform for all inputs)'.format(self._value))
1111 self._value = newValue
1112
1113
1117 def _getSize(self, files):
1118 for fname in files:
1119 if self._urlType == 'posix':
1120 try:
1121 self._fileMetadata[fname]['size'] = os.stat(fname).st_size
1122 except OSError as e:
1123 msg.error('Got exception {0!s} raised while stating file {1}'.format(e, fname))
1124 self._fileMetadata[fname]['size'] = None
1125 else:
1126 # OK, let's see if ROOT can do it...
1127 msg.debug('Calling ROOT TFile.GetSize({0})'.format(fname))
1128 self._fileMetadata[fname]['size'] = ROOTGetSize(fname)
1129
1130
1131
1135 def _getIntegrity(self, files):
1136 for fname in files:
1137 is_binary = False
1138 with open(fname) as f:
1139 try:
1140 while True:
1141 chunk = len(f.read(1024*1024))
1142 msg.debug('Read {0} bytes from {1}'.format(chunk, fname))
1143 if chunk == 0:
1144 break
1145 self._fileMetadata[fname]['integrity'] = True
1146 except OSError as e:
1147 msg.error('Got exception {0!s} raised while checking integrity of file {1}'.format(e, fname))
1148 self._fileMetadata[fname]['integrity'] = False
1149 except UnicodeDecodeError:
1150 msg.debug('Problem reading file as unicode, attempting with binary')
1151 is_binary = True
1152 if is_binary:
1153 with open(fname,'rb') as f:
1154 try:
1155 while True:
1156 chunk = len(f.read(1024*1024))
1157 msg.debug('Read {0} bytes from {1}'.format(chunk, fname))
1158 if chunk == 0:
1159 break
1160 self._fileMetadata[fname]['integrity'] = True
1161 except OSError as e:
1162 msg.error('Got exception {0!s} raised while checking integrity of file {1}'.format(e, fname))
1163 self._fileMetadata[fname]['integrity'] = False
1164
1165
1169 def _generateGUID(self, files):
1170 for fname in files:
1171 msg.debug('Generating a GUID for file {0}'.format(fname))
1172 self._fileMetadata[fname]['file_guid'] = str(uuid.uuid4()).upper()
1173
1174
1175
1180 def _exists(self, files):
1181 import re
1182 msg.debug('Testing existance for {0}'.format(files))
1183 def split_filelist(fn):
1184 if self.io != 'output':
1185 return [fn]
1186 file_split_regex = re.compile(r"(.+)\[(.+)](.+)")
1187 if ('[' in fn) and (']' in fn):
1188 match = file_split_regex.match(fn)
1189 return [f"{match.group(1)}{it}{match.group(3)}" for it in match.group(2).split(',')]
1190 else:
1191 return [fn]
1192 for fname in files:
1193 file_list = split_filelist(fname)
1194 if self._urlType == 'posix':
1195 try:
1196 size = map(lambda fn: os.stat(fn).st_size, file_list)
1197 self._fileMetadata[fname]['file_size'] = sum(size)
1198 self._fileMetadata[fname]['_exists'] = True
1199 msg.debug('POSIX file {0} exists (or all elements of list)'.format(fname))
1200 except OSError as e:
1201 msg.error('Got exception {0!s} raised while stating file {1} (or some element of list) - probably it does not exist'.format(e, fname))
1202 self._fileMetadata[fname]['_exists'] = False
1203 else:
1204 # OK, let's see if ROOT can do it...
1205 msg.debug('Calling ROOT TFile.GetSize on {0} (or elements of list)'.format(fname))
1206 size = map(ROOTGetSize, file_list)
1207 if None in size:
1208 self._fileMetadata[fname]['_exists'] = False
1209 msg.error('Non-POSIX file {0} (or element of list) could not be opened - probably it does not exist'.format(fname))
1210 else:
1211 msg.debug('Non-POSIX file {0} (or all elements of list) exists'.format(fname))
1212 self._fileMetadata[fname]['file_size'] = sum(size)
1213 self._fileMetadata[fname]['_exists'] = True
1214
1215
1216 def __str__(self):
1217 return "{0}={1} (Type {2}, Dataset {3}, IO {4})".format(self.name, self.value, self.type, self.dataset, self.io)
1218
1219
1220
1224 def _mergeArgs(self, argdict, copyArgs=None):
1225 if copyArgs:
1226 myargdict = {}
1227 for arg in copyArgs:
1228 if arg in argdict:
1229 myargdict[arg] = copy.copy(argdict[arg])
1230
1231 else:
1232 myargdict = copy.copy(argdict)
1233 # Never do event count checks for self merging
1234 myargdict['checkEventCount'] = argSubstepBool('False', runarg=False)
1235 newopts = []
1236 if 'athenaopts' in myargdict:
1237 # Need to ensure that "nprocs" is not passed to merger
1238 # and prevent multiple '--threads' options when there are multiple sub-steps in 'athenopts'
1239 for subStep in myargdict['athenaopts'].value:
1240 hasNprocs = False
1241 hasNthreads = False
1242 for opt in myargdict['athenaopts'].value[subStep]:
1243 if opt.startswith('--nprocs'):
1244 hasNprocs = True
1245 continue
1246 # Keep at least one '--threads'
1247 elif opt.startswith('--threads'):
1248 hasNthreads = True
1249 if opt in newopts:
1250 continue
1251 newopts.append(opt)
1252 # If we have hybrid MP+MT job make sure --threads is not passed to merger
1253 if hasNprocs and hasNthreads:
1254 tmpopts = []
1255 for opt in newopts:
1256 if opt.startswith('--threads'):
1257 continue
1258 tmpopts.append(opt)
1259 newopts = tmpopts
1260 myargdict['athenaopts'] = argSubstepList(newopts, runarg=False)
1261 return myargdict
1262
1263
1265 def __init__(self, value=list(), io = 'output', type=None, splitter=',', runarg=True, multipleOK=None, name=None):
1266 super(argYODAFile, self).__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1267 name=name)
1268
1269 self._metadataKeys.update({
1270 'nentries': self._getNumberOfEvents,
1271 'lheSumOfPosWeights': self._getWeightedEvents,
1272 'lheSumOfNegWeights': 0,
1273 })
1274
1275 def _getNumberOfEvents(self, files):
1276 msg.debug('Retrieving event count for LHE file {0}'.format(files))
1277 import tarfile
1278 for fname in files:
1279 # Attempt to treat this as a pileup reweighting file
1280 try :
1281 tar = tarfile.open(fname, "r:gz")
1282 lhecount = 0
1283 for untar in tar.getmembers():
1284 fileTXT = tar.extractfile(untar)
1285 if fileTXT is not None :
1286 lines = fileTXT.read().decode("utf-8")
1287 lhecount = lines.count('/event')
1288
1289 self._fileMetadata[fname]['nentries'] = lhecount
1290 except Exception:
1291 msg.debug('Entries is set to None - event count undefined for this LHE')
1292 self._fileMetadata[fname]['nentries'] = -1
1293
1294 def _getWeightedEvents(self, files):
1295 msg.debug('Retrieving weight count for LHE file {0}'.format(files))
1296 import tarfile
1297 import re
1298
1299 for fname in files:
1300 weightPos = 0
1301 weightNeg = 0
1302 try :
1303 tar = tarfile.open(fname, "r:gz")
1304 for untar in tar.getmembers():
1305 fileTXT = tar.extractfile(untar)
1306 next = False
1307 if fileTXT is not None :
1308 for line in fileTXT :
1309 line = line.decode("utf-8")
1310 if next :
1311 try :
1312 w = float(re.sub(' +',' ',line).split(" ")[2])
1313 if w > 0 : weightPos += w
1314 else : weightNeg += abs(w)
1315 except Exception:
1316 pass
1317 next = False
1318 if "<event" in line :
1319 next = True
1320
1321 self._fileMetadata[fname]['lheSumOfPosWeights'] = weightPos
1322 self._fileMetadata[fname]['lheSumOfNegWeights'] = weightNeg
1323 except Exception:
1324 msg.debug('Entries is set to None - negative fraction count undefined for this LHE')
1325 self._fileMetadata[fname]['lheSumOfPosWeights'] = -1
1326 self._fileMetadata[fname]['lheSumOfNegWeights'] = -1
1327
1328
1331 def __init__(self, value = list(), type=None, subtype=None, io = 'output', splitter=',', runarg=True, multipleOK = None,
1332 name=None, executor=list(), mergeTargetSize=-1, auxiliaryFile=False):
1333 super(argAthenaFile, self).__init__(value=value, subtype=subtype, io=io, type=type, splitter=splitter, runarg=runarg,
1334 multipleOK=multipleOK, name=name, executor=executor, mergeTargetSize=mergeTargetSize,
1335 auxiliaryFile=auxiliaryFile)
1336
1337 # Extra metadata known for athena files:
1338 for key in athFileInterestingKeys:
1339 self._metadataKeys[key] = self._getAthInfo
1340
1341
1342 def _callAthInfo(self, files, doAllFiles, retrieveKeys):
1343 if doAllFiles:
1344 myFiles = self._value
1345 else:
1346 myFiles = files
1347 msg.debug('Will retrieve metadata info for {0!s}'.format(myFiles))
1348 aftype = 'POOL'
1349 if self._type.upper() in ('BS', 'RAW'):
1350 aftype = 'BS'
1351 elif self._type.upper() in ('TAG'):
1352 aftype = 'TAG'
1353
1354 # N.B. Could parallelise here
1355 for fname in myFiles:
1356 athFileMetadata = AthenaLiteFileInfo(fname, aftype, retrieveKeys=retrieveKeys)
1357 if athFileMetadata is None:
1358 raise trfExceptions.TransformMetadataException(trfExit.nameToCode('TRF_METADATA_CALL_FAIL'), 'Call to AthenaLiteFileInfo failed')
1359 msg.debug('Setting metadata for file {0} to {1}'.format(fname, athFileMetadata[fname]))
1360 self._fileMetadata[fname].update(athFileMetadata[fname])
1361
1362
1363 def _getAthInfo(self, files):
1364 self._callAthInfo(files, doAllFiles = True, retrieveKeys=athFileInterestingKeys)
1365
1366 @property
1368 desc=super(argAthenaFile, self).prodsysDescription
1369 return desc
1370
1371
1372
1374
1375 integrityFunction = "returnIntegrityOfBSFile"
1376
1377 def _getIntegrity(self, files):
1378 for fname in files:
1379 try:
1380 rc=call(["AtlListBSEvents", "-c", fname], logger=msg, message="Report by AtlListBSEvents: ", timeout=600)
1382 return False
1383 if rc==0:
1384 self._fileMetadata[fname]['integrity'] = True
1385 else:
1386 self._fileMetadata[fname]['integrity'] = False
1387
1388 @property
1390 desc=super(argBSFile, self).prodsysDescription
1391 return desc
1392
1393
1400 def selfMerge(self, output, inputs, counter=0, argdict={}):
1401 msg.debug('selfMerge attempted for {0} -> {1} with {2} (index {3})'.format(inputs, output, argdict, counter))
1402
1403 # First do a little sanity check
1404 for fname in inputs:
1405 if fname not in self._value:
1406 raise trfExceptions.TransformMergeException(trfExit.nameToCode('TRF_FILEMERGE_PROBLEM'),
1407 "File {0} is not part of this agument: {1}".format(fname, self))
1408
1409 from PyJobTransforms.trfExe import bsMergeExecutor, executorConfig
1410
1411
1412 myargdict = self._mergeArgs(argdict)
1413 myargdict['maskEmptyInputs'] = argBool(True)
1414 myargdict['allowRename'] = argBool(True)
1415 myargdict['emptyStubFile'] = argString(inputs[0])
1416
1417 # We need a athenaExecutor to do the merge
1418 # N.B. We never hybrid merge AthenaMP outputs as this would prevent further merging in another
1419 # task (hybrid merged files cannot be further bybrid merged)
1420 myDataDictionary = {'BS_MRG_INPUT' : argBSFile(inputs, type=self.type, io='input'),
1421 'BS_MRG_OUTPUT' : argBSFile(output, type=self.type, io='output')}
1422 myMergeConf = executorConfig(myargdict, myDataDictionary)
1423 myMerger = bsMergeExecutor(name='BSMergeAthenaMP{0}{1}'.format(self._subtype, counter), conf=myMergeConf, exe = 'file_merging',
1424 inData=set(['BS_MRG_INPUT']), outData=set(['BS_MRG_OUTPUT']))
1425 myMerger.doAll(input=set(['BS_MRG_INPUT']), output=set(['BS_MRG_OUTPUT']))
1426
1427 # OK, if we got to here with no exceptions, we're good shape
1428 # Now update our own list of files to reflect the merge
1429 for fname in inputs:
1430 self._value.remove(fname)
1431 self._value.append(output)
1432
1433 msg.debug('Post self-merge files are: {0}'.format(self._value))
1434 self._resetMetadata(inputs + [output])
1435 return myMerger
1436
1437
1438
1441
1442 integrityFunction = "returnIntegrityOfPOOLFile"
1443
1444 # trfValidateRootFile is written in an odd way, so timelimit it here.
1445 @timelimited()
1446 def _getIntegrity(self, files):
1447 for fname in files:
1448 from PyJobTransforms.trfValidateRootFile import checkFile
1449 rc=checkFile(fileName=fname, the_type='event', requireTree=False)
1450 if rc==0:
1451 self._fileMetadata[fname]['integrity'] = True
1452 else:
1453 self._fileMetadata[fname]['integrity'] = False
1454
1455 @property
1457 desc=super(argPOOLFile, self).prodsysDescription
1458 return desc
1459
1460
1465 def selfMerge(self, output, inputs, counter=0, argdict={}):
1466 msg.debug('selfMerge attempted for {0} -> {1} with {2}'.format(inputs, output, argdict))
1467
1468 # First do a little sanity check
1469 for fname in inputs:
1470 if fname not in self._value:
1471 raise trfExceptions.TransformMergeException(trfExit.nameToCode('TRF_FILEMERGE_PROBLEM'),
1472 "File {0} is not part of this agument: {1}".format(fname, self))
1473
1474 from PyJobTransforms.trfExe import athenaExecutor, executorConfig
1475
1476
1477 myargdict = self._mergeArgs(argdict)
1478
1479 # We need a athenaExecutor to do the merge
1480 # N.B. We never hybrid merge AthenaMP outputs as this would prevent further merging in another
1481 # task (hybrid merged files cannot be further bybrid merged)
1482 myDataDictionary = {'POOL_MRG_INPUT' : argPOOLFile(inputs, type=self.type, io='input'),
1483 'POOL_MRG_OUTPUT' : argPOOLFile(output, type=self.type, io='output')}
1484 myMergeConf = executorConfig(myargdict, myDataDictionary)
1485 myMerger = athenaExecutor(name='POOLMergeAthenaMP{0}{1}'.format(self._subtype, counter), conf=myMergeConf,
1486 skeletonCA = 'RecJobTransforms.MergePool_Skeleton',
1487 inData=set(['POOL_MRG_INPUT']), outData=set(['POOL_MRG_OUTPUT']),
1488 disableMT=True, disableMP=True)
1489 myMerger.doAll(input=set(['POOL_MRG_INPUT']), output=set(['POOL_MRG_OUTPUT']))
1490
1491 # OK, if we got to here with no exceptions, we're good shape
1492 # Now update our own list of files to reflect the merge
1493 for fname in inputs:
1494 self._value.remove(fname)
1495 self._value.append(output)
1496
1497 msg.debug('Post self-merge files are: {0}'.format(self._value))
1498 self._resetMetadata(inputs + [output])
1499 return myMerger
1500
1502
1503 integrityFunction = "returnIntegrityOfPOOLFile"
1504
1505
1506 def selfMerge(self, output, inputs, counter=0, argdict={}):
1507 msg.debug('selfMerge attempted for {0} -> {1} with {2}'.format(inputs, output, argdict))
1508
1509 # First do a little sanity check
1510 for fname in inputs:
1511 if fname not in self._value:
1512 raise trfExceptions.TransformMergeException(trfExit.nameToCode('TRF_FILEMERGE_PROBLEM'),
1513 "File {0} is not part of this agument: {1}".format(fname, self))
1514
1515
1516 mySubstepName = 'HITSMergeAthenaMP{0}'.format(counter)
1517 myargdict = self._mergeArgs(argdict)
1518
1519 from PyJobTransforms.trfExe import athenaExecutor, executorConfig
1520 myDataDictionary = {'HITS' : argHITSFile(inputs, type=self.type, io='input'),
1521 'HITS_MRG' : argHITSFile(output, type=self.type, io='output')}
1522 myMergeConf = executorConfig(myargdict, myDataDictionary)
1523 myMerger = athenaExecutor(name = mySubstepName,
1524 skeletonCA = 'SimuJobTransforms.HITSMerge_Skeleton',
1525 conf=myMergeConf,
1526 inData=set(['HITS']), outData=set(['HITS_MRG']),
1527 disableMT=False, disableMP=True)
1528 myMerger.doAll(input=set(['HITS']), output=set(['HITS_MRG']))
1529
1530 # OK, if we got to here with no exceptions, we're good shape
1531 # Now update our own list of files to reflect the merge
1532 for fname in inputs:
1533 self._value.remove(fname)
1534 self._value.append(output)
1535
1536 msg.debug('Post self-merge files are: {0}'.format(self._value))
1537 self._resetMetadata(inputs + [output])
1538 return myMerger
1539
1540
1542
1543 integrityFunction = "returnIntegrityOfPOOLFile"
1544
1545
1546 def selfMerge(self, output, inputs, counter=0, argdict={}):
1547 msg.debug('selfMerge attempted for {0} -> {1} with {2}'.format(inputs, output, argdict))
1548
1549 # First do a little sanity check
1550 for fname in inputs:
1551 if fname not in self._value:
1552 raise trfExceptions.TransformMergeException(trfExit.nameToCode('TRF_FILEMERGE_PROBLEM'),
1553 "File {0} is not part of this agument: {1}".format(fname, self))
1554
1555
1556 mySubstepName = 'EVNT_TRMergeAthenaMP{0}'.format(counter)
1557 myargdict = self._mergeArgs(argdict)
1558
1559 from PyJobTransforms.trfExe import athenaExecutor, executorConfig
1560 myDataDictionary = {'EVNT_TR' : argEVNT_TRFile(inputs, type=self.type, io='input'),
1561 'EVNT_TR_MRG' : argEVNT_TRFile(output, type=self.type, io='output')}
1562 myMergeConf = executorConfig(myargdict, myDataDictionary)
1563 myMerger = athenaExecutor(name = mySubstepName, skeletonFile = 'SimuJobTransforms/skeleton.EVNT_TRMerge.py',
1564 conf=myMergeConf,
1565 inData=set(['EVNT_TR']), outData=set(['EVNT_TR_MRG']),
1566 disableMT=False, disableMP=True)
1567 myMerger.doAll(input=set(['EVNT_TR']), output=set(['EVNT_TR_MRG']))
1568
1569 # OK, if we got to here with no exceptions, we're good shape
1570 # Now update our own list of files to reflect the merge
1571 for fname in inputs:
1572 self._value.remove(fname)
1573 self._value.append(output)
1574
1575 msg.debug('Post self-merge files are: {0}'.format(self._value))
1576 self._resetMetadata(inputs + [output])
1577 return myMerger
1578
1579
1581
1582 integrityFunction = "returnIntegrityOfPOOLFile"
1583
1584
1585 def selfMerge(self, output, inputs, counter=0, argdict={}):
1586 msg.debug('selfMerge attempted for {0} -> {1} with {2}'.format(inputs, output, argdict))
1587
1588 # First do a little sanity check
1589 for fname in inputs:
1590 if fname not in self._value:
1591 raise trfExceptions.TransformMergeException(trfExit.nameToCode('TRF_FILEMERGE_PROBLEM'),
1592 "File {0} is not part of this agument: {1}".format(fname, self))
1593
1594
1595 myargdict = self._mergeArgs(argdict)
1596
1597 from PyJobTransforms.trfExe import athenaExecutor, executorConfig
1598 myDataDictionary = {'RDO' : argHITSFile(inputs, type=self.type, io='input'),
1599 'RDO_MRG' : argHITSFile(output, type=self.type, io='output')}
1600 myMergeConf = executorConfig(myargdict, myDataDictionary)
1601 myMerger = athenaExecutor(name = 'RDOMergeAthenaMP{0}'.format(counter),
1602 skeletonCA = 'SimuJobTransforms.RDOMerge_Skeleton',
1603 conf=myMergeConf,
1604 inData=set(['RDO']), outData=set(['RDO_MRG']),
1605 disableMT=False, disableMP=True)
1606 myMerger.doAll(input=set(['RDO']), output=set(['RDO_MRG']))
1607
1608 # OK, if we got to here with no exceptions, we're good shape
1609 # Now update our own list of files to reflect the merge
1610 for fname in inputs:
1611 self._value.remove(fname)
1612 self._value.append(output)
1613
1614 msg.debug('Post self-merge files are: {0}'.format(self._value))
1615 self._resetMetadata(inputs + [output])
1616 return myMerger
1617
1619
1620 integrityFunction = "returnIntegrityOfPOOLFile"
1621
1622
1623 def selfMerge(self, output, inputs, counter=0, argdict={}):
1624 msg.debug('selfMerge attempted for {0} -> {1} with {2}'.format(inputs, output, argdict))
1625
1626 # First do a little sanity check
1627 for fname in inputs:
1628 if fname not in self._value:
1629 raise trfExceptions.TransformMergeException(trfExit.nameToCode('TRF_FILEMERGE_PROBLEM'),
1630 "File {0} is not part of this agument: {1}".format(fname, self))
1631
1632
1633 mySubstepName = 'EVNTMergeAthenaMP{0}'.format(counter)
1634 myargdict = self._mergeArgs(argdict)
1635
1636 from PyJobTransforms.trfExe import athenaExecutor, executorConfig
1637 myDataDictionary = {'EVNT' : argEVNTFile(inputs, type=self.type, io='input'),
1638 'EVNT_MRG' : argEVNTFile(output, type=self.type, io='output')}
1639 myMergeConf = executorConfig(myargdict, myDataDictionary)
1640 myMerger = athenaExecutor(name = mySubstepName, skeletonCA = 'EvgenJobTransforms.EVNTMerge_Skeleton',
1641 conf=myMergeConf,
1642 inData=set(['EVNT']), outData=set(['EVNT_MRG']),
1643 disableMT=False, disableMP=True)
1644 myMerger.doAll(input=set(['EVNT']), output=set(['EVNT_MRG']))
1645
1646 # OK, if we got to here with no exceptions, we're good shape
1647 # Now update our own list of files to reflect the merge
1648 for fname in inputs:
1649 self._value.remove(fname)
1650 self._value.append(output)
1651
1652 msg.debug('Post self-merge files are: {0}'.format(self._value))
1653 self._resetMetadata(inputs + [output])
1654 return myMerger
1655
1656
1657
1659
1660 integrityFunction = "returnIntegrityOfHISTFile"
1661
1662 def __init__(self, value=list(), io = 'output', type=None, subtype=None, splitter=',', runarg=True, countable=True, multipleOK = None,
1663 name=None, auxiliaryFile=False):
1664 super(argHISTFile, self).__init__(value=value, io=io, type=type, subtype=subtype, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1665 name=name, auxiliaryFile=auxiliaryFile)
1666
1667 # Make events optional for HISTs (can be useful for HIST_AOD, HIST_ESD before hist merging)
1668 if countable:
1669 self._metadataKeys.update({
1670 'nentries': self._getNumberOfEvents
1671 })
1672
1673
1674 def _getIntegrity(self, files):
1675 for fname in files:
1676 self._fileMetadata[fname]['integrity'] = 'UNDEFINED'
1677
1678
1679 def _getNumberOfEvents(self, files):
1680 for fname in files:
1681 try:
1682 self._fileMetadata[fname]['nentries'] = HISTEntries(fname)
1684 msg.error('Timeout counting events for {0}'.format(fname))
1685
1686 @property
1688 desc=super(argHISTFile, self).prodsysDescription
1689 return desc
1690
1691
1692
1695
1696 integrityFunction = "returnIntegrityOfNTUPFile"
1697
1698
1701 def __init__(self, value=list(), io = 'output', type=None, subtype=None, splitter=',', treeNames=None, runarg=True, multipleOK = None,
1702 name=None, mergeTargetSize=-1, auxiliaryFile=False):
1703 super(argNTUPFile, self).__init__(value=value, io=io, type=type, subtype=subtype, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1704 name=name, mergeTargetSize=mergeTargetSize, auxiliaryFile=auxiliaryFile)
1705 self._treeNames=treeNames
1706
1707 self._metadataKeys.update({
1708 'nentries': self._getNumberOfEvents,
1709 'file_guid': self._generateGUID,
1710 'integrity': self._getIntegrity,
1711 })
1712
1713 if name and 'NTUP_PILEUP' in name:
1714 self._metadataKeys.update({
1715 'sumOfWeights': self._getNumberOfEvents,
1716 })
1717
1718 def _getNumberOfEvents(self, files):
1719 msg.debug('Retrieving event count for NTUP files {0}'.format(files))
1720 if self._treeNames is None:
1721 for fname in files:
1722 # Attempt to treat this as a pileup reweighting file
1723 myPRWEntries = PRWEntries(fileName=fname)
1724 if myPRWEntries is not None:
1725 self._fileMetadata[fname]['nentries'] = myPRWEntries
1726 if self.name and 'NTUP_PILEUP' in self.name:
1727 myPRWEntries = PRWEntries(fileName=fname, integral=True)
1728 self._fileMetadata[fname]['sumOfWeights'] = myPRWEntries
1729 else:
1730 # Attempt to treat this as a PHYSVAL file
1731 myPHYSVALEntries = PHYSVALEntries(fileName=fname)
1732 if myPHYSVALEntries is not None:
1733 self._fileMetadata[fname]['nentries'] = myPHYSVALEntries
1734 if self.name and 'NTUP_PHYSVAL' in self.name:
1735 myPHYSVALEntries = PHYSVALEntries(fileName=fname, integral=True)
1736 self._fileMetadata[fname]['sumOfWeights'] = myPHYSVALEntries
1737 else:
1738 msg.debug('treeNames is set to None - event count undefined for this NTUP')
1739 self._fileMetadata[fname]['nentries'] = 'UNDEFINED'
1740 else:
1741 for fname in files:
1742 try:
1743 self._fileMetadata[fname]['nentries'] = NTUPEntries(fileName=fname, treeNames=self._treeNames)
1745 msg.error('Timeout counting events for {0}'.format(fname))
1746
1747
1748 def _getIntegrity(self, files):
1749 for fname in files:
1750 from PyJobTransforms.trfValidateRootFile import checkFile
1751 rc=checkFile(fileName=fname, the_type='basket', requireTree=False)
1752 if rc==0:
1753 self._fileMetadata[fname]['integrity'] = True
1754 else:
1755 self._fileMetadata[fname]['integrity'] = False
1756
1757
1758 def selfMerge(self, output, inputs, counter=0, argdict={}):
1759 msg.debug('selfMerge attempted for {0} -> {1} with {2}'.format(inputs, output, argdict))
1760
1761 # First do a little sanity check
1762 for fname in inputs:
1763 if fname not in self._value:
1764 raise trfExceptions.TransformMergeException(trfExit.nameToCode('TRF_FILEMERGE_PROBLEM'),
1765 "File {0} is not part of this agument: {1}".format(fname, self))
1766
1767 from PyJobTransforms.trfExe import NTUPMergeExecutor, executorConfig
1768
1769
1770 myargdict = self._mergeArgs(argdict)
1771
1772 # We need a NTUPMergeExecutor to do the merge
1773 myDataDictionary = {'NTUP_MRG_INPUT' : argNTUPFile(inputs, type=self.type, io='input'),
1774 'NYUP_MRG_OUTPUT' : argNTUPFile(output, type=self.type, io='output')}
1775 myMergeConf = executorConfig(myargdict, myDataDictionary)
1776 myMerger = NTUPMergeExecutor(name='NTUPMergeAthenaMP{0}{1}'.format(self._subtype, counter), conf=myMergeConf,
1777 inData=set(['NTUP_MRG_INPUT']), outData=set(['NTUP_MRG_OUTPUT']))
1778 myMerger.doAll(input=set(['NTUP_MRG_INPUT']), output=set(['NYUP_MRG_OUTPUT']))
1779
1780 # OK, if we got to here with no exceptions, we're good shape
1781 # Now update our own list of files to reflect the merge
1782 for fname in inputs:
1783 self._value.remove(fname)
1784 self._value.append(output)
1785
1786 msg.debug('Post self-merge files are: {0}'.format(self._value))
1787 self._resetMetadata(inputs + [output])
1788 return myMerger
1789
1790 @property
1792 desc=super(argNTUPFile, self).prodsysDescription
1793 return desc
1794
1795
1796
1797
1799 def _getIntegrity(self, files):
1800 for fname in files:
1801 # bz2 only supports 'with' from python 2.7
1802 try:
1803 f = bz2.BZ2File(fname, 'r')
1804 while True:
1805 chunk = len(f.read(1024*1024))
1806 msg.debug('Read {0} bytes from {1}'.format(chunk, fname))
1807 if chunk == 0:
1808 break
1809 self._fileMetadata[fname]['integrity'] = True
1810 f.close()
1811 except OSError as e:
1812 msg.error('Got exception {0!s} raised while checking integrity of file {1}'.format(e, fname))
1813 self._fileMetadata[fname]['integrity'] = False
1814
1815
1816 @property
1818 desc=super(argBZ2File, self).prodsysDescription
1819 return desc
1820
1821
1822
1824 def __init__(self, value=list(), io = 'output', type=None, splitter=',', runarg=True, multipleOK=None, name=None):
1825 super(argFTKIPFile, self).__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1826 name=name)
1827 self._metadataKeys.update({
1828 'nentries': self._getNumberOfEvents
1829 })
1830
1831 def _getNumberOfEvents(self, files):
1832 for fname in files:
1833 try:
1834 eventCount = 0
1835 f = bz2.BZ2File(fname, 'r')
1836 for line in f:
1837 if line.startswith('F'):
1838 eventCount += 1
1839 self._fileMetadata[fname]['nentries'] = eventCount
1840 except OSError as e:
1841 msg.error('Event count for file {0} failed: {1!s}'.format(fname, e))
1842 self._fileMetadata[fname]['nentries'] = None
1843
1844 @property
1846 desc=super(argFTKIPFile, self).prodsysDescription
1847 return desc
1848
1849
1852 def __init__(self, value=list(), io = 'output', type='txt_evt', splitter=',', runarg=True, multipleOK=None, name=None):
1853 super(argHepEvtAsciiFile, self).__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg,
1854 multipleOK=multipleOK, name=name)
1855 self._metadataKeys.update({
1856 'nentries': self._getNumberOfEvents
1857 })
1858
1859 def _getNumberOfEvents(self, files):
1860 for fname in files:
1861 try:
1862 eventCount = 0
1863
1864 if '.tar.gz' in fname or '.tgz' in fname:
1865 import tarfile
1866 with tarfile.open(fname, "r:gz") as tar:
1867 for untar in tar.getmembers():
1868 fileTXT = tar.extractfile(untar)
1869 if fileTXT is not None:
1870 # Iterate line-by-line to avoid memory explosion
1871 for aline in fileTXT:
1872 if aline.startswith(b'E '):
1873 eventCount += 1
1874 elif '.gz' in fname:
1875 import gzip
1876 with gzip.open(fname,'rb') as gzin:
1877 for aline in gzin:
1878 if aline.startswith(b'E '):
1879 eventCount += 1
1880 else:
1881 # Assume uncompressed
1882 with open(fname,'r') as infile:
1883 for aline in infile:
1884 if aline.startswith('E '):
1885 eventCount += 1
1886 self._fileMetadata[fname]['nentries'] = eventCount
1887 except OSError as e:
1888 msg.error('Event count for file {0} failed: {1!s}'.format(fname, e))
1889 self._fileMetadata[fname]['nentries'] = 'UNDEFINED'
1890
1891
1893 def __init__(self, value=list(), io = 'output', type=None, splitter=',', runarg=True, multipleOK=None, name=None):
1894 super(argLHEFile, self).__init__(value=value, io=io, type=type, splitter=splitter, runarg=runarg, multipleOK=multipleOK,
1895 name=name)
1896
1897 self._metadataKeys.update({
1898 'nentries': self._getNumberOfEvents,
1899 'lheSumOfPosWeights': self._getWeightedEvents,
1900 'lheSumOfNegWeights': 0,
1901 })
1902
1903 def _getNumberOfEvents(self, files):
1904 msg.debug('Retrieving event count for LHE file {0}'.format(files))
1905 import tarfile
1906 for fname in files:
1907 # Decompress this as we read
1908 try :
1909 tar = tarfile.open(fname, "r:gz")
1910 lhecount = 0
1911 for untar in tar.getmembers():
1912 fileTXT = tar.extractfile(untar)
1913 if fileTXT is not None :
1914 lines = fileTXT.read().decode("utf-8")
1915 lhecount = lines.count('/event')
1916
1917 self._fileMetadata[fname]['nentries'] = lhecount
1918 except Exception:
1919 msg.debug('Entries is set to None - event count undefined for this LHE')
1920 self._fileMetadata[fname]['nentries'] = 'UNDEFINED'
1921
1922 def _getWeightedEvents(self, files):
1923 msg.debug('Retrieving weight count for LHE file {0}'.format(files))
1924 import tarfile
1925 import re
1926
1927 for fname in files:
1928 weightPos = 0
1929 weightNeg = 0
1930 try :
1931 tar = tarfile.open(fname, "r:gz")
1932 for untar in tar.getmembers():
1933 fileTXT = tar.extractfile(untar)
1934 next = False
1935 if fileTXT is not None :
1936 lines = fileTXT.readlines()
1937 for line in lines :
1938 if next :
1939 try :
1940 w = float(re.sub(' +',' ',line).split(" ")[2])
1941 if w > 0 : weightPos += w
1942 else : weightNeg += abs(w)
1943 except Exception:
1944 pass
1945 next = False
1946 if "<event" in line :
1947 next = True
1948
1949 self._fileMetadata[fname]['lheSumOfPosWeights'] = weightPos
1950 self._fileMetadata[fname]['lheSumOfNegWeights'] = weightNeg
1951 except Exception:
1952 msg.debug('Entries is set to None - negative fraction count undefined for this LHE')
1953 self._fileMetadata[fname]['lheSumOfPosWeights'] = 'UNDEFINED'
1954 self._fileMetadata[fname]['lheSumOfNegWeights'] = 'UNDEFINED'
1955
1956
1961
1962
1965 def __init__(self, value = None, runarg = True, name = None, defaultSubstep = 'all', separator = ':'):
1966 self._defaultSubstep = defaultSubstep
1967 self._separator = separator
1968 super(argSubstep, self).__init__(value, runarg, name)
1969
1970 # Reset getter
1971 @property
1972 def value(self):
1973 return self._value
1974
1975 # The default setter for sustep class
1976 @value.setter
1977 def value(self, value):
1978 msg.debug('Attempting to set argSubstep from {0!s} (type {1}'.format(value, type(value)))
1979 if value is None:
1980 self._value = {}
1981 elif isinstance(value, str):
1982 self._value = dict(self._parseStringAsSubstep(value))
1983 elif isinstance(value, (list, tuple)):
1984 # This is a list of strings to parse, so we go through them one by one
1985 self._value = {}
1986 for item in value:
1987 if not isinstance(item, str):
1988 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Failed to convert list item {0!s} to substep (should be a string)'.format(item))
1989 self._value.update(dict(self._parseStringAsSubstep(item)))
1990 elif isinstance(value, dict):
1991 self._value = value
1992 else:
1993 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Setter value {0!s} (type {1}) for substep argument cannot be parsed'.format(value, type(value)))
1994
1995
1996
2001 def _parseStringAsSubstep(self, string):
2002 subStepMatch = re.match(r'([a-zA-Z0-9,]+)' + self._separator + r'(.*)', string)
2003 subStepList = []
2004 if subStepMatch:
2005 subStep = subStepMatch.group(1).split(',')
2006 subStepValue = subStepMatch.group(2)
2007 else:
2008 subStep = [self._defaultSubstep]
2009 subStepValue = string
2010 msg.debug('Parsed {0} as substep {1}, argument {2}'.format(string, subStep, subStepValue))
2011 for step in subStep:
2012 subStepList.append((step, subStepValue))
2013 return subStepList
2014
2015
2016
2021 def returnMyValue(self, name=None, substep=None, first=False, exe=None):
2022 if exe:
2023 name = exe.name
2024 substep = exe.substep
2025 first = exe.conf.firstExecutor
2026
2027 name = commonExecutorStepName(name)
2028
2029 value = None
2030
2031 if name in self._value:
2032 value = self._value[name]
2033 elif substep in self._value:
2034 value = self._value[substep]
2035 elif first and 'first' in self._value:
2036 value = self._value['first']
2037 elif 'default' in self._value:
2038 value = self._value['default']
2039
2040
2048 if 'all' in self._value:
2049 if value is None:
2050 value = self._value['all']
2051 elif isinstance(value, list):
2052 value = self._value['all'] + value
2053
2054 msg.debug('From substep argument {myvalue} picked value "{value}" for {name}, {substep}, first={first}'.format(myvalue=self._value, value=value, name=name, substep=substep, first=first))
2055
2056 return value
2057
2058 @property
2060 desc = {'type': 'substep', 'substeptype': 'str', 'separator': self._separator,
2061 'default': self._defaultSubstep}
2062 return desc
2063
2064
2071
2072
2075 def __init__(self, value = None, runarg = True, name = None, defaultSubstep = 'all', splitter = None, separator=':'):
2076 self._splitter = splitter
2077 super(argSubstepList, self).__init__(value, runarg, name, defaultSubstep, separator)
2078
2079
2080 # Reset getter
2081 @property
2082 def value(self):
2083 return self._value
2084
2085 @property
2087 desc = {'type': 'substep', 'substeptype': 'list', 'listtype': 'str',
2088 'separator': self._separator,
2089 'default': self._defaultSubstep}
2090 return desc
2091 @value.setter
2092 def value(self, value):
2093 msg.debug('Attempting to set argSubstep from {0!s} (type {1}'.format(value, type(value)))
2094 if value is None:
2095 self._value = {}
2096 elif isinstance(value, str):
2097 self._value = dict(self._parseStringAsSubstep(value))
2098 elif isinstance(value, (list, tuple)):
2099 # This is a list of strings to parse
2100 self._value = {}
2101 for item in value:
2102 if not isinstance(item, str):
2103 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Failed to convert list item {0!s} to substep (should be a string)'.format(item))
2104 subStepList = self._parseStringAsSubstep(item)
2105 for subStep in subStepList:
2106 if subStep[0] in self._value:
2107 self._value[subStep[0]].extend(subStep[1])
2108 else:
2109 self._value[subStep[0]] = subStep[1]
2110 elif isinstance(value, dict):
2111 for k, v in value.items():
2112 if not isinstance(k, str):
2113 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Dictionary key {0!s} for substep is not a string'.format(k))
2114 if not isinstance(v, list):
2115 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Dictionary value {0!s} for substep is not a list'.format(v))
2116 self._value = value
2117 else:
2118 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Setter value {0!s} (type {1}) for substep argument cannot be parsed'.format(value, type(value)))
2119
2120
2122 def _parseStringAsSubstep(self, value):
2123 subStepList = super(argSubstepList, self)._parseStringAsSubstep(value)
2124 if self._splitter:
2125 subStepList = [(s[0], s[1].split(self._splitter)) for s in subStepList]
2126 else:
2127 subStepList = [(s[0], [s[1]]) for s in subStepList]
2128 return subStepList
2129
2130
2132
2133 # Reset getter
2134 @property
2135 def value(self):
2136 return self._value
2137
2138 @property
2140 desc = {'type': 'substep', 'substeptype': 'str', 'separator': self._separator,
2141 'default': self._defaultSubstep}
2142 return desc
2143
2144 @value.setter
2145 def value(self, value):
2146 msg.debug('Attempting to set argSubstep from {0!s} (type {1}'.format(value, type(value)))
2147 if value is None:
2148 self._value = {}
2149 elif isinstance(value, str):
2150 subStepList = self._parseStringAsSubstep(value)
2151 self._value = dict([(subStep[0], subStep[1]) for subStep in subStepList])
2152 elif isinstance(value, (list, tuple)):
2153 # This is a list of strings to parse
2154 self._value = {}
2155 for item in value:
2156 if not isinstance(item, str):
2157 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Failed to convert list item {0!s} to substep (should be a string)'.format(item))
2158 subStepList = self._parseStringAsSubstep(item)
2159 for subStep in subStepList:
2160 self._value[subStep[0]] = subStep[1]
2161 elif isinstance(value, dict):
2162 for k, v in value.items():
2163 if not isinstance(k, str):
2164 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Dictionary key {0!s} for substep is not a string'.format(k))
2165 if not isinstance(v, str):
2166 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Dictionary value {0!s} for substep is not a string'.format(v))
2167 self._value = value
2168 else:
2169 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Setter value {0!s} (type {1}) for substep argument cannot be parsed'.format(value, type(value)))
2170
2171
2173
2174 # Reset getter
2175 @property
2176 def value(self):
2177 return self._value
2178
2179 @property
2181 desc = {'type': 'substep', 'substeptype': 'bool', 'separator': self._separator,
2182 'default': self._defaultSubstep}
2183 return desc
2184
2185 @value.setter
2186 def value(self, value):
2187 msg.debug('Attempting to set argSubstep from {0!s} (type {1})'.format(value, type(value)))
2188 if value is None:
2189 self._value = {self._defaultSubstep: True}
2190 elif isinstance(value, bool):
2191 self._value = {self._defaultSubstep: value}
2192 elif isinstance(value, str):
2193 subStepList = self._parseStringAsSubstep(value)
2194 self._value = dict([(subStep[0], strToBool(subStep[1])) for subStep in subStepList])
2195 elif isinstance(value, (list, tuple)):
2196 # This is a list of strings to parse
2197 self._value = {}
2198 for item in value:
2199 if not isinstance(item, str):
2200 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Failed to convert list item {0!s} to substep (should be a string)'.format(item))
2201 subStepList = self._parseStringAsSubstep(item)
2202 for subStep in subStepList:
2203 self._value[subStep[0]] = strToBool(subStep[1])
2204 elif isinstance(value, dict):
2205 for k, v in value.items():
2206 if not isinstance(k, str):
2207 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Dictionary key {0!s} for substep is not a string'.format(k))
2208 if not isinstance(v, bool):
2209 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Dictionary value {0!s} for substep is not a bool'.format(v))
2210 self._value = value
2211 else:
2212 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Setter value {0!s} (type {1}) for substep argument cannot be parsed'.format(value, type(value)))
2213
2214
2215
2217
2218 # Reset getter
2219 @property
2220 def value(self):
2221 return self._value
2222
2223 @property
2225 desc = {'type': 'substep', 'substeptype': 'int', 'separator': self._separator,
2226 'default': self._defaultSubstep}
2227 return desc
2228
2229 @value.setter
2230 def value(self, value):
2231 msg.debug('Attempting to set argSubstep from {0!s} (type {1}'.format(value, type(value)))
2232 try:
2233 if value is None:
2234 self._value = {}
2235 elif isinstance(value, int):
2236 self._value = {self._defaultSubstep: value}
2237 elif isinstance(value, str):
2238 subStepList = self._parseStringAsSubstep(value)
2239 self._value = dict([(subStep[0], int(subStep[1])) for subStep in subStepList])
2240 elif isinstance(value, (list, tuple)):
2241 # This is a list of strings to parse
2242 self._value = {}
2243 for item in value:
2244 if not isinstance(item, str):
2245 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Failed to convert list item {0!s} to substep (should be a string)'.format(item))
2246 subStepList = self._parseStringAsSubstep(item)
2247 for subStep in subStepList:
2248 self._value[subStep[0]] = int(subStep[1])
2249 elif isinstance(value, dict):
2250 for k, v in value.items():
2251 if not isinstance(k, str):
2252 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Dictionary key {0!s} for substep is not a string'.format(k))
2253 if not isinstance(v, int):
2254 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Dictionary value {0!s} for substep is not an int'.format(v))
2255 self._value = value
2256 else:
2257 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Setter value {0!s} (type {1}) for substep argument cannot be parsed'.format(value, type(value)))
2258 except ValueError:
2259 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Failed to convert substep value {0} to int'.format(value))
2260
2261
2262
2263
2265
2266 def __init__(self, value=None, min=None, max=None, runarg=True, name=None):
2267 self._min = min
2268 self._max = max
2269 super(argSubstepFloat, self).__init__(value = value, runarg = runarg, name=name)
2270
2271 @property
2273 desc = {'type': 'substep', 'substeptype': 'float', 'separator': self._separator,
2274 'default': self._defaultSubstep}
2275 if self._min:
2276 desc['min'] = self._min
2277 if self._max:
2278 desc['max'] = self._max
2279 return desc
2280
2281
2282 # Reset getter
2283 @property
2284 def value(self):
2285 return self._value
2286
2287 @value.setter
2288 def value(self, value):
2289 msg.debug('Attempting to set argSubstep from {0!s} (type {1}'.format(value, type(value)))
2290 try:
2291 if value is None:
2292 self._value = {}
2293 elif isinstance(value, float):
2294 self._value = {self._defaultSubstep: value}
2295 elif isinstance(value, str):
2296 subStepList = self._parseStringAsSubstep(value)
2297 self._value = dict([(subStep[0], float(subStep[1])) for subStep in subStepList])
2298 elif isinstance(value, (list, tuple)):
2299 # This is a list of strings to parse
2300 self._value = {}
2301 for item in value:
2302 if not isinstance(item, str):
2303 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
2304 'Failed to convert list item {0!s} to substep (should be a string)'.format(item))
2305 subStepList = self._parseStringAsSubstep(item)
2306 for subStep in subStepList:
2307 self._value[subStep[0]] = float(subStep[1])
2308 elif isinstance(value, dict):
2309 for k, v in value.items():
2310 if not isinstance(k, str):
2311 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
2312 'Dictionary key {0!s} for substep is not a string'.format(k))
2313 if not isinstance(v, float):
2314 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
2315 'Dictionary value {0!s} for substep is not an float'.format(v))
2316 self._value = value
2317 else:
2318 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
2319 'Setter value {0!s} (type {1}) for substep argument cannot be parsed'.format(value, type(value)))
2320 # Now do min/max checks
2321 for my_float in self._value.values():
2322 if (self._min is not None and my_float < self._min) or (self._max is not None and my_float > self._max):
2323 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_OUT_OF_RANGE'),
2324 'argFloat value out of range: {0} is not between {1} and {2}'.format(my_float, self._min, self._max))
2325 except ValueError as e:
2326 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
2327 'Failed to convert substep value {0} to float: {1}'.format(value, e))
2328
2329
2330
2332 # This singleton is where we define some aliases for common production
2333 # usecases of steering.
2334 # "no" - a convenience null option for production managers, does nothing
2335 # "doRDO_TRIG" - run split trigger for Reco_tf and friends
2336 # "doOverlay" - run event overlay on presampled RDOs instead of standard HITtoRDO digitization
2337 # "doFCtoDAOD" - run the FastChain transform, including the Derivation step, to process data from EVNT to DAOD
2338 # "afterburn" - run the B decay afterburner for event generation
2339 # "doRAWtoALL" - (deprecated) produce all DESDs and AODs directly from bytestream
2340 # "doTRIGtoALL" - (deprecated) produce AODs directly from trigger RDOs
2341 steeringAlises = {
2342 'no': {},
2343 'doRDO_TRIG': {'RAWtoALL': [('in', '-', 'RDO'), ('in', '+', 'RDO_TRIG'), ('in', '-', 'BS')]},
2344 'doOverlay': {'HITtoRDO': [('in', '-', 'HITS'), ('out', '-', 'RDO'), ('out', '-', 'RDO_FILT')],
2345 'Overlay': [('in', '+', ('HITS', 'RDO_BKG')), ('out', '+', 'RDO')]},
2346 'doFCtoDAOD': {'Derivation': [('in', '-', 'EVNT')]},
2347 'afterburn': {'generate': [('out', '-', 'EVNT')]},
2348 'doRAWtoALL': {},
2349 'doTRIGtoALL': {}
2350 }
2351
2352 # Reset getter
2353 @property
2354 def value(self):
2355 return self._value
2356
2357 # This argument gets dumped in a special way, using an alias directly
2358 # instead of the expanded value
2359 @property
2360 def dumpvalue(self):
2361 return self._dumpvalue
2362
2363 @property
2365 desc = {'type': 'substep', 'substeptype': 'steering', 'listtype': 'str', 'separator': self._separator,
2366 'default': self._defaultSubstep}
2367 return desc
2368
2369
2372 @value.setter
2373 def value(self, value):
2374 msg.debug('Attempting to set argSubstepSteering from {0!s} (type {1})'.format(value, type(value)))
2375 if value is None:
2376 self._value = {}
2377 self._dumpvalue = [""]
2378 elif isinstance(value, dict):
2379 # OK, this should be the direct setable dictionary - but do a check of that
2380 for k, v in value.items():
2381 if not isinstance(k, str) or not isinstance(v, list):
2382 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
2383 'Failed to convert dict {0!s} to argSubstepSteering'.format(value))
2384 for subv in v:
2385 if not isinstance(subv, (list, tuple)) or len(subv) != 3 or subv[0] not in ('in', 'out') or subv[1] not in ('+', '-'):
2386 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
2387 'Failed to convert dict {0!s} to argSubstepSteering'.format(value))
2388 self._value = value
2389 # Note we are a little careful here to never reset the dumpvalue - this is
2390 # because when processing the _list_ of steering arguments down to a single
2391 # multi-valued argument we re-call value() with an expanded diectionary and
2392 # one can nievely reset dumpvalue by mistake
2393 self._dumpvalue = getattr(self, "_dumpvalue", value)
2394 elif isinstance(value, (str, list, tuple)):
2395 if isinstance(value, str):
2396 value = [value,]
2397 self._dumpvalue = getattr(self, "_dumpvalue", value)
2398 # Now we have a list of strings to parse
2399 self._value = {}
2400 for item in value:
2401 if not isinstance(item, str):
2402 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
2403 'Failed to convert list item {0!s} to substep (should be a string)'.format(item))
2404 if item in argSubstepSteering.steeringAlises:
2405 msg.debug("Found value {0} in steeringAlises ({1})".format(item, argSubstepSteering.steeringAlises[item]))
2406 for substep, steerlist in argSubstepSteering.steeringAlises[item].items():
2407 if substep in self._value:
2408 self._value[substep].extend(steerlist)
2409 else:
2410 self._value[substep] = steerlist
2411 else:
2412 subStepList = self._parseStringAsSubstep(item)
2413 self._value.update(dict([(subStep[0], self._parseSteeringString(subStep[1])) for subStep in subStepList]))
2414 else:
2415 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
2416 'Setter value {0!s} (type {1}) for substep argument cannot be parsed'.format(value, type(value)))
2417
2418 def _parseSetterString(self, string):
2419 if string in argSubstepSteering.steeringAlises:
2420 return argSubstepSteering.steeringAlises[string]
2421
2422 def _parseSteeringString(self, ivalue):
2423 retvalue = []
2424 for subvalue in ivalue.split(','):
2425 matchedParts = re.match(r'(in|out)(\+|\-)([A-Z_]+)$', subvalue)
2426 if not matchedParts:
2427 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
2428 'Failed to convert string {0!s} to argSubstepSteering'.format(subvalue))
2429 retvalue.append((matchedParts.group(1), matchedParts.group(2), matchedParts.group(3)))
2430 return retvalue
2431
2432
2433
2435 @property
2436 def value(self):
2437 return self._value
2438
2439 @value.setter
2440 def value(self, value):
2441 msg.debug('Attempting to set argSubstepConditions from {0!s} (type {1}'.format(value, type(value)))
2442 # super().value = value workaround:
2443 super(self.__class__, self.__class__).value.fset(self, value)
2444
2445 current = None
2446 for k, v in self._value.items():
2447 if "CurrentMC" == v:
2448 if current is None:
2449 current = self._amiLookUp(getAMIClient())
2450 self._value[k] = current
2451
2452 def _amiLookUp(self, client):
2453 cmd = "COMAGetGlobalTagNameByCurrentState --state=CurrentMC"
2454 return str(client.execute(cmd, format = 'dom_object').get_rows().pop()['globalTag'])
2455
2456 @property
2458 desc = {'type': 'substep', 'substeptype': 'str', 'separator': self._separator,
2459 'default': self._defaultSubstep}
2460 return desc
2461
2462
2463class trfArgParser(argparse.ArgumentParser):
2464
2465
2470 def __init__(self, *args, **kwargs):
2471 self._helpString = {}
2472 self._argClass = {}
2473 self._argGroups = {}
2475 self._argAlias = {}
2476 super(trfArgParser, self).__init__(*args, **kwargs)
2477
2478 def add_argument(self, *args, **kwargs):
2479 argName = args[0].lstrip('-')
2480 msg.debug('Found arg name {0}'.format(argName))
2481
2482 # Ban arguments with hyphens as they cause trouble in signature files and then
2483 # AMI tag definitions because of the auto-translation to underscores in argparse
2484 if '-' in argName:
2485 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_ERROR'),
2486 'Transform arguments may not use hyphens (use camelCase or underscore')
2487
2488 # Prevent a crash if this argument already exists (there are valid use cases for 'grabbing' an
2489 # argument, so this is DEBUG, not WARNING)
2490 if argName in self._argClass:
2491 msg.debug('Double definition of argument {0} - ignored'.format(argName))
2492 return
2493
2494 # if there is a help function defined for the argument then populate the helpString dict
2495 if 'help' in kwargs:
2496 self._helpString[argName] = kwargs['help'] # if the help option is present for the argument then put it into the helpString dict key = argument name, value = help
2497 else:
2498 self._helpString[argName] = None
2499 if 'action' in kwargs and 'factory' in dir(kwargs['action']):
2500 self._argClass[argName] = kwargs['action'].factory
2501 elif 'type' in kwargs:
2502 self._argClass[argName] = kwargs['type']
2503 else:
2504 self._argClass[argName] = None
2505
2506 # Remove kwargs which are not understood by ArgumentParser.add_argument()
2507 strippedArgs = {}
2508 for arg in ('group',):
2509 if arg in kwargs:
2510 strippedArgs[arg] = kwargs.pop(arg)
2511
2512 # Setup aliases
2513 if len(args) > 1:
2514 for i in range(1, len(args)):
2515 argAlias = args[i].lstrip('-')
2516 msg.debug('Adding an alias of {0}: {1}'.format(argName, argAlias))
2517 self._argAlias[argAlias] = argName
2518
2519 # Optinally add an argument to an argparse argument group
2520 if 'group' in strippedArgs:
2521 if strippedArgs['group'] in self._argGroups:
2522 msg.debug('Adding argument to group {0}: ({1}; {2})'.format(strippedArgs['group'], args, kwargs))
2523 self._argGroups[strippedArgs['group']].add_argument(*args, **kwargs)
2524 self._argKeyGroups[argName] = strippedArgs['group']
2525 else:
2526 msg.warning('Argument group {0} not defined - adding argument to main parser'.format(strippedArgs['group']))
2527 msg.debug('Adding argument: ({0}; {1})'.format(args, kwargs))
2528 super(trfArgParser, self).add_argument(*args, **kwargs)
2529 else:
2530 msg.debug('Adding argument: ({0}; {1})'.format(args, kwargs))
2531 super(trfArgParser, self).add_argument(*args, **kwargs)
2532
2533 @property
2535 desc = {}
2536 for name, argClass in self._argClass.items():
2537 msg.debug('Detected the local variable {0}'.format(name))
2538 if argClass is not None:
2539 desc[name] = argClass().prodsysDescription
2540 if name in self._helpString:
2541 desc[name].update({'help': self._helpString[name]})
2542 if name in self._argKeyGroups:
2543 desc[name].update({'group':self._argKeyGroups[name]})
2544 return desc
2545
2546
2547 def defineArgGroup(self, *args):
2548 # Get an argparse group
2549 if args[0] in self._argGroups:
2550 msg.warning('Argument group %s already exists', args[0])
2551 return
2552 self._argGroups[args[0]] = self.add_argument_group(*args)
2553
2554
2555 def getHelpString(self, argument):
2556 try:
2557 return(self._helpString[argument])
2558 except KeyError:
2559 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_KEY_ERROR'), 'no help string available for argument %s' %argument)
2560 return None
2561
2562
2564 def dumpArgs(self):
2565 keyArray = [ '--' + str(key) for key in self._helpString if key not in ('h', 'verbose', 'loglevel', 'dumpargs', 'argdict') ]
2566 keyArray.sort()
2567 print('ListOfDefaultPositionalKeys={0}'.format(keyArray))
2568
2569
2570 @property
2571 def allArgs(self):
2572 return list(self._helpString)
2573
2574 # @brief parsing helper
2575 def _parse_list_helper(self, value):
2576 # We build on the value[0] instance as this contains the correct metadata
2577 # and object references for this instance (shallow copying can
2578 # mess up object references and deepcopy thows exceptions!)
2579 newValueObj = value[0]
2580 msg.debug('Started with: %s = %s', type(newValueObj), newValueObj)
2581 if isinstance(value[0], argSubstep):
2582 # Make sure you do not have a reference to the original value - this is a deeper copy
2583 newValues = dictSubstepMerge(value[0].value, {})
2584 elif isinstance(value[0], list):
2585 if len(value) == 1:
2586 return self._parse_list_helper(value[0])
2587 msg.debug('Handling a list of arguments for key')
2588 newValues = []
2589 for v in value:
2590 processedValueObj, processedValues = self._parse_list_helper(v)
2591 processedValueObj.value = processedValues
2592 newValues.append(processedValueObj)
2593 newValueObj = newValues
2594 return newValueObj, newValues
2595 elif isinstance(value[0].value, list):
2596 newValues = value[0].value
2597 elif isinstance(value[0].value, dict):
2598 newValues = value[0].value
2599 else:
2600 newValues = [value[0].value,]
2601 for valueObj in value[1:]:
2602 msg.debug('Value Object: %s = %s', type(valueObj), valueObj)
2603 if isinstance(value[0], argSubstep):
2604 # Special merger for lists attached to substeps
2605 newValues = dictSubstepMerge(newValues, valueObj.value)
2606 elif isinstance(valueObj.value, list):
2607 # General lists are concatenated
2608 newValues.extend(valueObj.value)
2609 elif isinstance(valueObj.value, dict):
2610 # General dictionaries are merged
2611 newValues.update(valueObj.value)
2612 else:
2613 newValues.append(valueObj.value)
2614 return newValueObj, newValues
2615
2616
2621 def parse_args(self, args = None, namespace = None):
2622 if namespace:
2623 super(trfArgParser, self).parse_args(args = args, namespace = namespace)
2624 else:
2625 namespace = super(trfArgParser, self).parse_args(args = args)
2626 for k, v in namespace.__dict__.items():
2627 msg.debug('Treating key %s (%s)', k, v)
2628 if isinstance(v, list):
2629 newValueObj, newValues = self._parse_list_helper(v)
2630 if not isinstance(newValueObj, list):
2631 newValueObj.value = newValues
2632 namespace.__dict__[k] = newValueObj
2633 msg.debug('Set to %s', newValues)
2634
2635 return namespace
2636
2637
2638
2639def strToBool(string):
2640 try:
2641 msg.debug("converting string {string} to boolean".format(string = string))
2642 if string.lower() == 'false':
2643 return False
2644 elif string.lower() == 'true':
2645 return True
2646 else:
2647 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Failed to convert value {0} to bool'.format(string))
2648 except AttributeError:
2649 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'), 'Failed to convert value {0} to bool'.format(string))
2650
2651
2660def dictSubstepMerge(dict1, dict2):
2661 mergeDict = {}
2662 allKeys = set(dict1) | set(dict2)
2663 # Find the value type - lists are special...
2664 listType = False
2665 if len(dict1) > 0:
2666 if isinstance(list(dict1.values())[0], list):
2667 listType = True
2668 elif len(dict2) > 0:
2669 if isinstance(list(dict2.values())[0], list):
2670 listType = True
2671 if listType:
2672 for key in allKeys:
2673 mergeDict[key] = dict1.get(key, []) + dict2.get(key, [])
2674 else:
2675 for key in allKeys:
2676 if key in dict1 and key in dict2:
2677 # Don't really know what to do if these clash...
2678 if dict1[key] != dict2[key]:
2679 raise trfExceptions.TransformArgException(trfExit.nameToCode('TRF_ARG_CONV_FAIL'),
2680 'Merging substep arguments found clashing values for substep {0}: {1}!={2}'.format(key, dict1[key], dict2[key]))
2681 mergeDict[key] = dict1[key]
2682 elif key in dict1:
2683 mergeDict[key] = dict1[key]
2684 else:
2685 mergeDict[key] = dict2[key]
2686
2687 return mergeDict
2688
2689
int upper(int c)
if(febId1==febId2)
void print(char *figname, TCanvas *c1)
STL class.
Factory class used to generate argparse actions, actions should be used when arguments are used witho...
__call__(self, option_strings, dest, **kwargs)
__init__(self, genclass, *args, **kwargs)
__init__(self, factory, option_strings, dest, **kwargs)
__call__(self, parser, namespace, values, option_string=None)
_callAthInfo(self, files, doAllFiles, retrieveKeys)
Workhorse which actually calls Meta Reader.
__init__(self, value=list(), type=None, subtype=None, io='output', splitter=',', runarg=True, multipleOK=None, name=None, executor=list(), mergeTargetSize=-1, auxiliaryFile=False)
_getAthInfo(self, files)
Small wrapper which sets the standard options for doAllFiles and retrieveKeys.
_getIntegrity(self, files)
File integrity checker.
selfMerge(self, output, inputs, counter=0, argdict={})
Method which can be used to merge files of this type.
_getIntegrity(self, files)
File integrity checker.
Boolean type argument.
selfMerge(self, output, inputs, counter=0, argdict={})
Method which can be used to merge EVNT files.
selfMerge(self, output, inputs, counter=0, argdict={})
Method which can be used to merge EVNT_TR files.
Class which defines a special input file format used in FTK simulation.
__init__(self, value=list(), io='output', type=None, splitter=',', runarg=True, multipleOK=None, name=None)
Factory class used to generate argument class instances for argparse.
__call__(self, valueString=None)
__init__(self, genclass, *args, **kwargs)
_getDatasetFromFilename(self, reset=False)
Look for dataset name in dataset.filename Tier0 convention.
_resetMetadata(self, files=[])
Resets all metadata files in this instance.
_readMetadata(self, files, metadataKeys)
Check metadata is in the cache or generate it if it's missing.
_getSize(self, files)
Determines the size of files.
mergeTargetSize(self)
mergeTargeSize value getter
getMetadata(self, files=None, metadataKeys=None, maskMetadataKeys=None, populate=True, flush=False)
Return specific keys for specific files.
_exists(self, files)
Try to determine if a file actually exists...
getSingleMetadata(self, fname, metadataKey, populate=True, flush=False)
Convenience function to extract a single metadata key for a single file.
_generateGUID(self, files)
Generate a GUID on demand - no intrinsic for this file type.
multipleOK(self)
multipleOK getter
str _urlType
Input file globbing and expansion.
executor(self)
Executor status getter.
_getIntegrity(self, files)
File integrity checker.
__init__(self, value=list(), type=None, subtype=None, io='output', splitter=',', runarg=True, guid=None, multipleOK=None, name=None, executor=list(), mergeTargetSize=-1, auxiliaryFile=False)
Initialise an argFile.
_setMetadata(self, files=None, metadataKeys={})
Set metadata values into the cache.
nentries(self)
Return total number of events in all constituent files.
__str__(self)
String representation of a file argument.
isCached(self, files=None, metadataKeys=None)
Test if certain metadata elements are already cached.
getnentries(self, fast=False)
Explicit getter, offering fast switch.
_mergeArgs(self, argdict, copyArgs=None)
Utility to strip arguments which should not be passed to the selfMerge methods of our child classes.
str _io
Input file globbing and expansion.
metadata(self)
Returns the whole kit and kaboodle...
valueSetter(self, value)
Set the argFile value, but allow parameters here.
__init__(self, value=None, min=None, max=None, runarg=True, name=None)
Float argument constructor.
Data quality histogram file class.
__init__(self, value=list(), io='output', type=None, subtype=None, splitter=',', runarg=True, countable=True, multipleOK=None, name=None, auxiliaryFile=False)
_getIntegrity(self, files)
There is no integrity check for HIST files - return 'UNDEFINED'.
selfMerge(self, output, inputs, counter=0, argdict={})
Method which can be used to merge HITS files.
__init__(self, value=list(), io='output', type='txt_evt', splitter=',', runarg=True, multipleOK=None, name=None)
__init__(self, value={}, supressEmptyStrings=True, splitter=',', kvsplitter=":", runarg=True, name=None)
Dictionary of key value arguments, where the values are floats.
__init__(self, value=list(), io='output', type=None, splitter=',', runarg=True, multipleOK=None, name=None)
List of string arguments.
__init__(self, value=[], supressEmptyStrings=True, splitter=',', runarg=True, name=None)
List of string arguments.
__str__(self)
String conversion.
append(self, addme)
Append a value to the list.
__repr__(self)
Repr conversion.
NTUP (plain ROOT) file class.
selfMerge(self, output, inputs, counter=0, argdict={})
__init__(self, value=list(), io='output', type=None, subtype=None, splitter=',', treeNames=None, runarg=True, multipleOK=None, name=None, mergeTargetSize=-1, auxiliaryFile=False)
selfMerge(self, output, inputs, counter=0, argdict={})
Method which can be used to merge files of this type.
_getIntegrity(self, files)
File integrity checker.
selfMerge(self, output, inputs, counter=0, argdict={})
Method which can be used to merge RDO files.
choices(self)
Choices getter.
__init__(self, value=None, runarg=True, name=None, choices=None)
Class initialisation.
Substep class for conditionsTag.
__init__(self, value=None, min=None, max=None, runarg=True, name=None)
Argument class for substep lists, suitable for preExec/postExec.
__init__(self, value=None, runarg=True, name=None, defaultSubstep='all', splitter=None, separator=':')
argSubstepList constructor
_parseStringAsSubstep(self, value)
Specialist parser for lists, which applies the splitter string, if defined.
Special argument class to hold steering information.
Base class for substep arguments.
_parseStringAsSubstep(self, string)
Parse a string for substep:value format.
returnMyValue(self, name=None, substep=None, first=False, exe=None)
Return the value of this substep arg for an executor with the given parameters.
__init__(self, value=None, runarg=True, name=None, defaultSubstep='all', separator=':')
argSubstep constructor
__init__(self, value=list(), io='output', type=None, splitter=',', runarg=True, multipleOK=None, name=None)
Basic argument class holding a value which can be get and set.
__repr__(self)
Repr conversion of our value.
__init__(self, value=None, runarg=True, name=None)
Initialise argument class.
__eq__(self, other)
Comparison is based on value attribute.
isRunarg(self)
Return runarg status.
__str__(self)
String conversion of our value.
defineArgGroup(self, *args)
Define an argparse argument group for the main parser to use.
getHelpString(self, argument)
Return the help string for a given argument.
allArgs(self)
Getter for argument list.
dumpArgs(self)
Return a list of all arguments understood by this transform in prodsys style.
add_argument(self, *args, **kwargs)
parse_args(self, args=None, namespace=None)
Call argument_parser parse_args, then concatenate values.
__init__(self, *args, **kwargs)
Subclassing argparse.
Group of argument based exceptions.
Base class for file merging exceptions.
Exception used by metadata functions.
Exception used by time limited executions.
STL class.
std::string replace(std::string s, const std::string &s2, const std::string &s3)
Definition hcg.cxx:310
std::vector< std::string > split(const std::string &s, const std::string &t=":")
Definition hcg.cxx:177
Utilities for configuration of transforms via AMI tags.
Transform execution functions.
Module for transform exit codes.
Transform utilities to deal with files.
Transform utility functions.
strToBool(string)
Small utility to convert a string value to a boolean.
dictSubstepMerge(dict1, dict2)
special dictionary merger which is used for substep type arguments