ATLAS Offline Software
Functions | Variables
python.TransformUtils Namespace Reference

Functions

def ExecCondAlgsAtPreFork (flags, cfg)
 
def executeFromFragment (fragment_string, flags, cfg=None)
 
def processPreExec (runArgs, flags)
 
def processPostExec (runArgs, flags, cfg)
 
def processPreInclude (runArgs, flags)
 
def processPostInclude (runArgs, flags, cfg)
 
def UseFrontier (flags)
 
def DumpPickle (flags, cfg)
 
def SortInput (flags, cfg)
 

Variables

 msg
 

Function Documentation

◆ DumpPickle()

def python.TransformUtils.DumpPickle (   flags,
  cfg 
)
Dump the pickle file for the current configuration

Definition at line 100 of file Tools/PyJobTransforms/python/TransformUtils.py.

100 def DumpPickle(flags, cfg):
101  """Dump the pickle file for the current configuration"""
102  with open("Configuration.pkl", "wb") as f:
103  cfg.store(f)
104 
105 

◆ ExecCondAlgsAtPreFork()

def python.TransformUtils.ExecCondAlgsAtPreFork (   flags,
  cfg 
)
Execute the entire conditions sequence during the PreFork step.
This allows saving some memory in AthenaMP jobs, which is particularly
useful in the derivation production.

Definition at line 5 of file Control/AthenaServices/python/TransformUtils.py.

5 def ExecCondAlgsAtPreFork(flags, cfg):
6  """ Execute the entire conditions sequence during the PreFork step.
7  This allows saving some memory in AthenaMP jobs, which is particularly
8  useful in the derivation production.
9  """
10  log = logging.getLogger('ExecCondAlgsAtPreFork')
11  try:
12  cfg.getService('AthMpEvtLoopMgr').ExecAtPreFork=['AthCondSeq']
13  log.info('AthCondSeq will be executed during PreFork')
14  except ConfigurationError: # in this context this might not be an error
15  log.info('AthCondSeq will NOT be executed during PreFork')

◆ executeFromFragment()

def python.TransformUtils.executeFromFragment (   fragment_string,
  flags,
  cfg = None 
)
Execute a function from a pre/post include fragment.

Definition at line 6 of file Tools/PyJobTransforms/python/TransformUtils.py.

6 def executeFromFragment(fragment_string, flags, cfg=None):
7  """Execute a function from a pre/post include fragment."""
8  # detect legacy job options in the transition period:
9  if fragment_string.endswith('.py'):
10  msg.warning(f'Trying to load legacy job options {fragment_string}. This should NOT be used in production!')
11  fragment_string = fragment_string[:-3]
12  fragment_string = fragment_string.replace('/', '.')
13  msg.warning(f'Resolved to {fragment_string}')
14 
15  parts = fragment_string.split('.')
16  if len(parts) < 2:
17  raise ValueError('Pre/post include should be of the form Package.Module.Function or Package.Function if defined in __init__.py')
18 
19  function = parts[-1]
20  module = '.'.join(parts[:-1])
21 
22  from importlib import import_module
23  loaded_module = import_module(module)
24  function_def = getattr(loaded_module, function)
25 
26  if not cfg:
27  function_def(flags)
28  return
29 
30  from inspect import getfullargspec
31  argspec = getfullargspec(function_def)
32  print(argspec)
33  argCount = len(argspec.args)
34  defaultsCount = len(argspec.defaults) if argspec.defaults else 0
35  if argCount - defaultsCount == 1:
36  cfg.merge(function_def(flags))
37  else:
38  function_def(flags, cfg)
39 
40 

◆ processPostExec()

def python.TransformUtils.processPostExec (   runArgs,
  flags,
  cfg 
)
Process postExec from runtime arguments.

Definition at line 50 of file Tools/PyJobTransforms/python/TransformUtils.py.

50 def processPostExec(runArgs, flags, cfg):
51  """Process postExec from runtime arguments."""
52  if not flags.locked():
53  raise RuntimeError('Running a postExec before locking ConfigFlags')
54 
55  if hasattr(runArgs, 'postExec') and runArgs.postExec and runArgs.postExec != 'NONE':
56  ConfigFlags = flags # noqa: F841
57  from AthenaConfiguration.ComponentFactory import CompFactory # noqa: F401
58  for cmd in runArgs.postExec:
59  exec(cmd)
60 
61 

◆ processPostInclude()

def python.TransformUtils.processPostInclude (   runArgs,
  flags,
  cfg 
)
Process postInclude from runtime arguments.

Definition at line 69 of file Tools/PyJobTransforms/python/TransformUtils.py.

69 def processPostInclude(runArgs, flags, cfg):
70  """Process postInclude from runtime arguments."""
71  if not flags.locked():
72  raise RuntimeError('Running a postInclude before locking ConfigFlags')
73 
74  if hasattr(runArgs, 'postInclude') and runArgs.postInclude and runArgs.postInclude != 'NONE':
75  for fragment in runArgs.postInclude:
76  executeFromFragment(fragment, flags, cfg)
77 
78 

◆ processPreExec()

def python.TransformUtils.processPreExec (   runArgs,
  flags 
)
Process preExec from runtime arguments.

Definition at line 41 of file Tools/PyJobTransforms/python/TransformUtils.py.

41 def processPreExec(runArgs, flags):
42  """Process preExec from runtime arguments."""
43  if hasattr(runArgs, 'preExec') and runArgs.preExec and runArgs.preExec != 'NONE':
44  ConfigFlags = flags # noqa: F841
45  from AthenaConfiguration.Enums import HIMode,BeamType # noqa: F401
46  for cmd in runArgs.preExec:
47  exec(cmd)
48 
49 

◆ processPreInclude()

def python.TransformUtils.processPreInclude (   runArgs,
  flags 
)
Process preInclude from runtime arguments.

Definition at line 62 of file Tools/PyJobTransforms/python/TransformUtils.py.

62 def processPreInclude(runArgs, flags):
63  """Process preInclude from runtime arguments."""
64  if hasattr(runArgs, 'preInclude') and runArgs.preInclude and runArgs.preInclude != 'NONE':
65  for fragment in runArgs.preInclude:
66  executeFromFragment(fragment, flags)
67 
68 

◆ SortInput()

def python.TransformUtils.SortInput (   flags,
  cfg 
)
Method that should be post-included after a typicaly configured Athena job
It will read original input files, create a list of Events in memory, sort them and produce
an intermediate Event Collection file that Athena will read instead of the original inputs
Event information is read from EventInfoTags (stored by default in all Athena data files)
The default sort key value (Lumi) can be oveerriden, as the sorting order
The intermediate Collection file can be inspected using CollQuery cmdline utility

Definition at line 106 of file Tools/PyJobTransforms/python/TransformUtils.py.

106 def SortInput(flags, cfg):
107  """Method that should be post-included after a typicaly configured Athena job
108  It will read original input files, create a list of Events in memory, sort them and produce
109  an intermediate Event Collection file that Athena will read instead of the original inputs
110  Event information is read from EventInfoTags (stored by default in all Athena data files)
111  The default sort key value (Lumi) can be oveerriden, as the sorting order
112  The intermediate Collection file can be inspected using CollQuery cmdline utility"""
113  import os
114  inputs = cfg.getService("EventSelector").InputCollections
115 
116  # set default sort parameters, read overrides from locals()
117  tmpCollFile = locals().get("AthenaInputSortCollName", "sortedEventRefs" + str(os.getpid()) )
118  sortTag = locals().get("AthenaInputSortTag", "LumiBlockN")
119  sortOrd = locals().get("AthenaInputSortOrder", "Ascending")
120 
121  from CollectionUtilities.SortedCollectionCreator import SortedCollectionCreator
122  sorter = SortedCollectionCreator(name="SortEvents")
123  # Sort Inputs based on one of the EventInfoTag attributes
124  # Store sorted event collection in a temporary file
125  # This should run as postInclude, so we assume EventSelector.InputCollections is set earlier
126  sorter.execute(inputs, outputCollection=tmpCollFile, sortAttribute=sortTag, sortOrder=sortOrd)
127 
128  # Reading Events through References require a populated FileCatalog
129  for inpfile in inputs:
130  os.system('pool_insertFileToCatalog {}'.format(inpfile))
131 
132  # Tell Athena to use the sorted collection instead of the original inputs
133  cfg.getService("EventSelector").InputCollections = [tmpCollFile + ".root"]
134  cfg.getService("EventSelector").CollectionType = "ExplicitROOT"
135  return cfg

◆ UseFrontier()

def python.TransformUtils.UseFrontier (   flags)
Switch database to using FRONTIER, but with a fallback
to DBRelease if FRONTIER_SERVER is undefined (e.g., on HPC)

Move from RecJobTransforms to PyJobTransforms to enable use
in simulation ATN and KV jobs

Definition at line 79 of file Tools/PyJobTransforms/python/TransformUtils.py.

79 def UseFrontier(flags):
80  """Switch database to using FRONTIER, but with a fallback
81  to DBRelease if FRONTIER_SERVER is undefined (e.g., on HPC)
82 
83  Move from RecJobTransforms to PyJobTransforms to enable use
84  in simulation ATN and KV jobs
85  """
86  from os import environ
87  from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
88  from AthenaConfiguration.ComponentFactory import CompFactory
89 
90  cfg = ComponentAccumulator()
91  if environ.get('FRONTIER_SERVER'):
92  msg.info('Enabling FRONTIER DB access')
93  cfg.addService(CompFactory.DBReplicaSvc(COOLSQLiteVetoPattern='DBRelease'))
94  else:
95  msg.info('Using default DB access')
96 
97  return cfg
98 
99 

Variable Documentation

◆ msg

python.TransformUtils.msg
python.TransformUtils.processPreExec
def processPreExec(runArgs, flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:41
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:302
vtune_athena.format
format
Definition: vtune_athena.py:14
python.TransformUtils.processPostExec
def processPostExec(runArgs, flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:50
python.TransformUtils.DumpPickle
def DumpPickle(flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:100
python.TransformUtils.UseFrontier
def UseFrontier(flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:79
python.TransformUtils.processPostInclude
def processPostInclude(runArgs, flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:69
python.TransformUtils.processPreInclude
def processPreInclude(runArgs, flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:62
python.TransformUtils.ExecCondAlgsAtPreFork
def ExecCondAlgsAtPreFork(flags, cfg)
Definition: Control/AthenaServices/python/TransformUtils.py:5
python.TransformUtils.SortInput
def SortInput(flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:106
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
Trk::open
@ open
Definition: BinningType.h:40
python.TransformUtils.executeFromFragment
def executeFromFragment(fragment_string, flags, cfg=None)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:6
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition: hcg.cxx:127
str
Definition: BTagTrackIpAccessor.cxx:11
dbg::print
void print(std::FILE *stream, std::format_string< Args... > fmt, Args &&... args)
Definition: SGImplSvc.cxx:70