3 from copy
import copy, deepcopy
4 from difflib
import get_close_matches
5 from enum
import EnumMeta
8 from AthenaCommon.Logging
import logging
9 from PyUtils.moduleExists
import moduleExists
10 from PyUtils.Decorators
import deprecate
12 _msg = logging.getLogger(
'AthConfigFlags')
15 """Return whether or not this is a gaudi-based (athena) environment"""
20 """The base flag object.
22 A flag can be set to either a fixed value or a callable, which computes
23 the value based on other flags.
26 __slots__ = [
'_value',
'_setDef',
'_type',
'_help']
32 def __init__(self, default, type=None, help=None):
33 """Initialise the flag with the default value.
35 Optionally set the type of the flag value and the help string.
38 raise ValueError(
"Default value of a flag must not be None")
45 """Set the value of the flag.
47 Can be a constant value or a callable.
58 def get(self, flagdict=None):
59 """Get the value of the flag.
61 If the currently set value is a callable, a dictionary of all available
62 flags needs to be provided.
65 if self.
_value is not None:
66 return deepcopy(self.
_value)
74 raise RuntimeError(
"Flag is using a callable but all flags are not available.")
87 return deepcopy(value)
90 if self.
_value is not None:
96 if (self.
_type is None or value
is None or
97 isinstance(value, self.
_type)
or
101 raise TypeError(f
"Flag is of type '{self._type.__name__}', "
102 f
"but value '{value}' of type '{type(value).__name__}' set.")
106 """Flags to dict converter
108 Used by both FlagAddress and AthConfigFlags. The input must be an
109 iterator over flags to be included in the dict.
113 for key, item
in iterator:
115 subkeys = key.split(
'.')
116 for subkey
in subkeys[:-1]:
117 x = x.setdefault(subkey,{})
118 x[subkeys[-1]] = item
123 if isinstance(f, AthConfigFlags):
125 rname = self.
_flags._renames.get(name, name)
128 elif isinstance(f, FlagAddress):
130 name = f._name+
"."+name
131 rname = self.
_flags._renames.get(name, name)
135 return getattr(self.
_flags, self.
_name +
"." + name)
138 if name.startswith(
"_"):
139 return object.__setattr__(self, name, value)
140 merged = self.
_name +
"." + name
142 if not self.
_flags.hasFlag( merged ):
143 self.
_flags._loadDynaFlags( merged )
145 if not self.
_flags.hasFlag( merged ):
146 raise RuntimeError(
"No such flag: {} The name is likely incomplete.".
format(merged) )
147 return self.
_flags._set( merged, value )
153 raise RuntimeError(
"No such flag: "+ self.
_name+
". The name is likely incomplete." )
162 raise RuntimeError(
"No such flag: "+ self.
_name+
". The name is likely incomplete." )
165 return getattr(self, name)
168 setattr(self, name, value)
171 merged = self.
_name +
"." + name
175 self.
_flags.loadAllDynamicFlags()
176 rmap = self.
_flags._renamed_map()
178 for flag
in self.
_flags._flagdict.keys():
179 if flag.startswith(self.
_name.rstrip(
'.') +
'.'):
180 for newflag
in rmap[flag]:
181 ntrim = len(self.
_name) + 1
182 n_dots_in = flag[:ntrim].
count(
'.')
183 remaining = newflag.split(
'.')[n_dots_in]
184 if remaining
not in used:
189 """Subflag iterator specialized for this address
192 self.
_flags.loadAllDynamicFlags()
194 rename = self.
_flags._renamed_map()
195 for key
in self.
_flags._flagdict.keys():
196 if key.startswith(address.rstrip(
'.') +
'.'):
197 ntrim = len(address) + 1
198 remaining = key[ntrim:]
199 for r
in rename[key]:
200 yield r, getattr(self, remaining)
203 """Convert to a python dictionary
205 Recursively convert this flag and all subflags into a
206 structure of nested dictionaries. All dynamic flags are
207 resolved in the process.
209 The resulting data structure should be easy to serialize as
234 raise RuntimeError(
"Cannot calculate hash of unlocked flag container")
235 elif self.
_hash is None:
240 raise DeprecationWarning(
"__hash__ method in AthConfigFlags is deprecated. Probably called from function decorator, use AccumulatorCache decorator instead.")
247 _flagdict = object.__getattribute__(self,
"_flagdict")
250 if name
in _flagdict:
251 return self.
_get(name)
260 if name
in _flagdict:
261 return self.
_get(name)
267 raise AttributeError(f
"No such flag: {name}")
270 if name.startswith(
"_"):
271 return object.__setattr__(self, name, value)
274 return self.
_set(name, value)
275 raise RuntimeError(
"No such flag: "+ name+
". The name is likely incomplete." )
281 return getattr(self, name)
284 setattr(self, name, value)
290 if key.startswith(name):
300 first = r.split(
'.',1)[0]
301 if first
not in used:
306 """Convert to a python dictionary
308 This is identical to the `asdict` in FlagAddress, but for all
316 """mapping from the old names to the new names
318 This is the inverse of _renamed, which maps new names to old
321 Returns a list of the new names corresponding to the old names
322 (since cloneAndReplace may or may not disable access to the old name,
323 it is possible that an old name renames to multiple new names)
328 if old
not in revmap:
329 revmap[old] = [ new ]
331 revmap[old] += [ new ]
334 for old, newlist
in revmap.items():
335 if key.startswith(old +
'.'):
336 stem = key.removeprefix(old)
337 return [ f
'{new}{stem}' if new
else '' for new
in newlist ]
343 """Subflag iterator for all flags
345 This is used by the asdict() function.
358 yield new, getattr(self, old)
359 except ModuleNotFoundError
as err:
360 _msg.debug(f
'missing module: {err}')
363 def addFlag(self, name, setDef, type=None, help=None):
366 raise KeyError(
"Duplicated flag name: {}".
format( name ))
372 The path is the beginning of the flag name (e.g. "X" for flags generated with name "X.*").
373 The generator is a function that returns a flags container, the flags have to start with the same path.
374 When the prefix is True the flags created by the generator are prefixed by "path".
376 Supported calls are then:
377 addFlagsCategory("A", g) - where g is function creating flags is f.addFlag("A.x", someValue)
378 addFlagsCategory("A", g, True) - when flags are defined in g like this: f.addFalg("x", somevalue),
379 The latter option allows to share one generator among flags that are later loaded in different paths.
382 _msg.debug(
"Adding flag category %s", path)
386 """ public interface for _loadDynaFlags """
391 loads the flags of the form "A.B.C" first attempting the path "A" then "A.B" and then "A.B.C"
394 def __load_impl( flagBaseName ):
395 if flagBaseName
in self.
_loaded:
396 _msg.debug(
"Flags %s already loaded",flagBaseName )
399 _msg.debug(
"Dynamically loading the flags under %s", flagBaseName )
404 generator, prefix = self.
_dynaflags[flagBaseName]
411 pathfrags = name.split(
'.')
412 for maxf
in range(1, len(pathfrags)+1):
413 __load_impl(
'.'.
join(pathfrags[:maxf]) )
416 """Force load all the dynamic flags """
435 if f.startswith(name+
'.'):
439 if c.startswith(name):
453 closestMatch = get_close_matches(name,self.
_flagdict.
keys(),1)
454 raise KeyError(f
"No flag with name '{name}' found" +
455 (f
". Did you mean '{closestMatch[0]}'?" if closestMatch
else ""))
461 closestMatch = get_close_matches(name,self.
_flagdict.
keys(),1)
462 raise KeyError(f
"No flag with name '{name}' found" +
463 (f
". Did you mean '{closestMatch[0]}'?" if closestMatch
else ""))
465 @
deprecate(
"Use '[...]' rather than '(...)' to access flags", print_context=
True)
467 return self.
_get(name)
481 raise RuntimeError(
"Attempt to modify locked flag container")
487 """Return an unlocked copy of self (dynamic flags are not loaded)"""
491 cln._renames = deepcopy(self.
_renames)
497 This is to replace subsets of configuration flags like
500 newflags = flags.cloneAndReplace('Muon', 'Trigger.Offline.Muon')
503 _msg.debug(
"cloning flags and replacing %s by %s", subsetToReplace, replacementSubset)
508 subsetToReplace = subsetToReplace.strip(
".")
509 replacementSubset = replacementSubset.strip(
".")
512 if (subsetToReplace == replacementSubset):
513 raise RuntimeError(f
'Can not replace flags {subsetToReplace} with themselves')
517 if src ==
"":
continue
518 if src+
"." in subsetToReplace:
519 raise RuntimeError(f
'Can not replace flags {subsetToReplace} by {replacementSubset} because of already present replacement of {alias} by {src}')
522 newFlags =
copy(self)
523 newFlags._renames = deepcopy(self.
_renames)
525 if replacementSubset
in newFlags._renames:
526 newFlags._renames[subsetToReplace] = newFlags._renames[replacementSubset]
528 newFlags._renames[subsetToReplace] = replacementSubset
531 if replacementSubset
not in newFlags._renames
or newFlags._renames[replacementSubset] == replacementSubset:
532 newFlags._renames[replacementSubset] =
""
534 del newFlags._renames[replacementSubset]
539 if replacementSubset
not in newFlags._renames:
540 newFlags._renames[replacementSubset] = replacementSubset
542 newFlags._hash =
None
546 def join(self, other, prefix=''):
548 Merges two flag containers
549 When the prefix is passed each flag from the "other" is prefixed by "prefix."
553 for (name,flag)
in other._flagdict.items():
554 fullName = prefix+
"."+name
if prefix !=
"" else name
556 raise KeyError(
"Duplicated flag name: {}".
format( fullName ) )
559 for (name,loader)
in other._dynaflags.items():
560 fullName = prefix+
"."+name
if prefix !=
"" else name
562 raise KeyError(
"Duplicated dynamic flags name: {}".
format( fullName ) )
563 _msg.debug(
"Joining dynamic flags with %s", fullName)
567 def dump(self, pattern=".*", evaluate=False, formatStr="{:40} : {}
", maxLength=None):
569 compiled = re.compile(pattern)
570 def truncate(s):
return s[:maxLength] + (
"..." if maxLength
and len(s)>maxLength
else "")
571 reverse_renames = {value: key
for key, value
in self.
_renames.
items()
if value !=
''}
574 if any([name.startswith(r)
for r
in reverse_renames.keys()]):
575 for oldprefix, newprefix
in reverse_renames.items():
576 if name.startswith(oldprefix):
577 renamed = name.replace(oldprefix, newprefix)
579 if compiled.match(renamed):
588 except Exception
as e:
594 print(
"Flag categories that can be loaded dynamically")
595 print(
"{:25} : {:>30} : {}".
format(
"Category",
"Generator name",
"Defined in" ) )
597 if compiled.match(name):
598 print(
"{:25} : {:>30} : {}".
format( name, gen_and_prefix[0].__name__,
'/'.
join(gen_and_prefix[0].__code__.co_filename.split(
'/')[-2:]) ) )
600 print(
"Flag categories that are redirected by the cloneAndReplace")
602 print(
"{:30} points to {:>30} ".
format( alias, src
if src
else "nothing") )
607 Mostly a self-test method
616 Scripts calling AthConfigFlags.fillFromArgs can extend this parser, and pass their version to fillFromArgs
619 from AthenaCommon.AthOptionsParser
import getArgumentParser
621 parser.add_argument(
"---",dest=
"terminator",action=
'store_true', help=argparse.SUPPRESS)
634 """Fill the flags from a string of type key=value"""
638 key, value = flag_string.split(
"=")
640 raise ValueError(f
"Cannot interpret argument {flag_string}, expected a key=value format")
643 value = value.strip()
654 raise KeyError(f
"{key} is not a known configuration flag")
657 if flag_type
is None:
660 ast.literal_eval(value)
664 elif isinstance(flag_type, EnumMeta):
666 ENUM = importlib.import_module(flag_type.__module__)
667 value=f
"ENUM.{value}"
670 exec(f
"self.{key}{oper}{value}")
676 Used to set flags from command-line parameters, like flags.fillFromArgs(sys.argv[1:])
685 argList = listOfArgs
or sys.argv[1:]
692 unrequiredActions = []
693 if "-h" in argList
or "--help" in argList:
695 if "-h" in argList: argList.remove(
"-h")
696 if "--help" in argList: argList.remove(
"--help")
698 for a
in parser._actions:
700 unrequiredActions.append(a)
702 (args,leftover)=parser.parse_known_args(argList)
703 for a
in unrequiredActions: a.required=
True
706 argList = [a
for a
in argList
if a
not in leftover]
710 """Check if dest is available in parser and has been set"""
711 return vars(args).
get(dest,
None)
is not None
714 self.Exec.DebugStage=args.debug
716 if arg_set(
'evtMax'):
717 self.Exec.MaxEvents=args.evtMax
719 if arg_set(
'interactive'):
720 self.Exec.Interactive=args.interactive
722 if arg_set(
'skipEvents'):
723 self.Exec.SkipEvents=args.skipEvents
725 if arg_set(
'filesInput'):
726 self.Input.Files = []
727 for f
in args.filesInput.split(
","):
730 self.Input.Files += found
if found
else [f]
732 if arg_set(
'loglevel'):
733 from AthenaCommon
import Constants
734 self.Exec.OutputLevel = getattr(Constants, args.loglevel)
736 if arg_set(
'config_only')
and args.config_only
is not False:
737 from os
import environ
738 environ[
"PICKLECAFILE"] =
"" if args.config_only
is True else args.config_only
740 if arg_set(
'threads'):
741 self.Concurrency.NumThreads = args.threads
745 if args.concurrent_events
is None and self.Concurrency.NumConcurrentEvents==0:
746 self.Concurrency.NumConcurrentEvents = args.threads
748 if arg_set(
'concurrent_events'):
749 self.Concurrency.NumConcurrentEvents = args.concurrent_events
751 if arg_set(
'nprocs'):
752 self.Concurrency.NumProcs = args.nprocs
754 if arg_set(
'perfmon'):
755 from PerfMonComps.PerfMonConfigHelpers
import setPerfmonFlagsFromRunArgs
759 self.Exec.MTEventService = args.mtes
761 if arg_set(
'mtes_channel'):
762 self.Exec.MTEventServiceChannel = args.mtes_channel
764 if arg_set(
'profile_python'):
765 from AthenaCommon.Debugging
import dumpPythonProfile
766 import atexit, cProfile, functools
767 cProfile._athena_python_profiler = cProfile.Profile()
768 cProfile._athena_python_profiler.enable()
771 atexit.register(functools.partial(dumpPythonProfile, args.profile_python))
779 if do_help
and '=' not in arg:
780 argList += arg.split(
".")
786 if parser.epilog
is None: parser.epilog=
""
787 parser.epilog +=
" Note: Specify additional flags in form <flagName>=<value>."
788 subparsers = {
"":[parser,parser.add_subparsers(help=argparse.SUPPRESS)]}
790 logging.root.setLevel(logging.ERROR)
791 def getParser(category):
792 if category
not in subparsers.keys():
793 cat1,cat2 = category.rsplit(
".",1)
if "." in category
else (
"",category)
794 p,subp = getParser(cat1)
795 if subp.help==argparse.SUPPRESS:
796 subp.help =
"Flag subcategories:"
797 newp = subp.add_parser(cat2,help=
"{} flags".
format(category),
798 formatter_class = argparse.ArgumentDefaultsHelpFormatter,usage=argparse.SUPPRESS)
799 newp._positionals.title =
"flags"
800 subparsers[category] = [newp,newp.add_subparsers(help=argparse.SUPPRESS)]
801 return subparsers[category]
804 category,flagName = name.rsplit(
".",1)
if "." in name
else (
"",name)
807 val =
repr(flag.get(self))
810 if flag._help != argparse.SUPPRESS:
812 if flag._help
is not None:
813 helptext = f
": {flag._help}"
814 if flag._type
is not None:
815 helptext += f
' [type: {flag._type.__name__}]'
816 if val
is not None and helptext ==
"":
818 getParser(category)[0].add_argument(name, nargs=
'?', default=val, help=helptext)
820 parser._positionals.title =
'flags and positional arguments'
821 parser.parse_known_args(argList + [
"--help"])