ATLAS Offline Software
Functions | Variables
python.TransformUtils Namespace Reference

Functions

def ExecCondAlgsAtPreFork (flags, cfg)
 
def executeFromFragment (fragment_string, flags, cfg=None)
 
def processPreExec (runArgs, flags)
 
def processPostExec (runArgs, flags, cfg)
 
def processPreInclude (runArgs, flags)
 
def processPostInclude (runArgs, flags, cfg)
 
def UseFrontier (flags)
 
def UseCREST (flags)
 
def DumpPickle (flags, cfg)
 
def SortInput (flags, cfg)
 

Variables

 msg
 

Function Documentation

◆ DumpPickle()

def python.TransformUtils.DumpPickle (   flags,
  cfg 
)
Dump the pickle file for the current configuration

Definition at line 116 of file Tools/PyJobTransforms/python/TransformUtils.py.

116 def DumpPickle(flags, cfg):
117  """Dump the pickle file for the current configuration"""
118  with open("Configuration.pkl", "wb") as f:
119  cfg.store(f)
120 
121 

◆ ExecCondAlgsAtPreFork()

def python.TransformUtils.ExecCondAlgsAtPreFork (   flags,
  cfg 
)
Execute the entire conditions sequence during the PreFork step.
This allows saving some memory in AthenaMP jobs, which is particularly
useful in the derivation production.

Definition at line 5 of file Control/AthenaServices/python/TransformUtils.py.

5 def ExecCondAlgsAtPreFork(flags, cfg):
6  """ Execute the entire conditions sequence during the PreFork step.
7  This allows saving some memory in AthenaMP jobs, which is particularly
8  useful in the derivation production.
9  """
10  log = logging.getLogger('ExecCondAlgsAtPreFork')
11  try:
12  cfg.getService('AthMpEvtLoopMgr').ExecAtPreFork=['AthCondSeq']
13  log.info('AthCondSeq will be executed during PreFork')
14  except ConfigurationError: # in this context this might not be an error
15  log.info('AthCondSeq will NOT be executed during PreFork')

◆ executeFromFragment()

def python.TransformUtils.executeFromFragment (   fragment_string,
  flags,
  cfg = None 
)
Execute a function from a pre/post include fragment.

Definition at line 6 of file Tools/PyJobTransforms/python/TransformUtils.py.

6 def executeFromFragment(fragment_string, flags, cfg=None):
7  """Execute a function from a pre/post include fragment."""
8  # detect legacy job options in the transition period:
9  if fragment_string.endswith('.py'):
10  msg.warning(f'Trying to load legacy job options {fragment_string}. This should NOT be used in production!')
11  fragment_string = fragment_string[:-3]
12  fragment_string = fragment_string.replace('/', '.')
13  msg.warning(f'Resolved to {fragment_string}')
14 
15  parts = fragment_string.split('.')
16  if len(parts) < 2:
17  raise ValueError('Pre/post include should be of the form Package.Module.Function or Package.Function if defined in __init__.py')
18 
19  function = parts[-1]
20  module = '.'.join(parts[:-1])
21 
22  from importlib import import_module
23  loaded_module = import_module(module)
24  function_def = getattr(loaded_module, function)
25 
26  if not cfg:
27  function_def(flags)
28  return
29 
30  from inspect import getfullargspec
31  argspec = getfullargspec(function_def)
32  print(argspec)
33  argCount = len(argspec.args)
34  defaultsCount = len(argspec.defaults) if argspec.defaults else 0
35  if argCount - defaultsCount == 1:
36  cfg.merge(function_def(flags))
37  else:
38  function_def(flags, cfg)
39 
40 

◆ processPostExec()

def python.TransformUtils.processPostExec (   runArgs,
  flags,
  cfg 
)
Process postExec from runtime arguments.

Definition at line 50 of file Tools/PyJobTransforms/python/TransformUtils.py.

50 def processPostExec(runArgs, flags, cfg):
51  """Process postExec from runtime arguments."""
52  if not flags.locked():
53  raise RuntimeError('Running a postExec before locking ConfigFlags')
54 
55  if hasattr(runArgs, 'postExec') and runArgs.postExec and runArgs.postExec != 'NONE':
56  ConfigFlags = flags # noqa: F841
57  from AthenaConfiguration.ComponentFactory import CompFactory # noqa: F401
58  for cmd in runArgs.postExec:
59  exec(cmd)
60 
61 

◆ processPostInclude()

def python.TransformUtils.processPostInclude (   runArgs,
  flags,
  cfg 
)
Process postInclude from runtime arguments.

Definition at line 69 of file Tools/PyJobTransforms/python/TransformUtils.py.

69 def processPostInclude(runArgs, flags, cfg):
70  """Process postInclude from runtime arguments."""
71  if not flags.locked():
72  raise RuntimeError('Running a postInclude before locking ConfigFlags')
73 
74  if hasattr(runArgs, 'postInclude') and runArgs.postInclude and runArgs.postInclude != 'NONE':
75  for fragment in runArgs.postInclude:
76  executeFromFragment(fragment, flags, cfg)
77 
78 

◆ processPreExec()

def python.TransformUtils.processPreExec (   runArgs,
  flags 
)
Process preExec from runtime arguments.

Definition at line 41 of file Tools/PyJobTransforms/python/TransformUtils.py.

41 def processPreExec(runArgs, flags):
42  """Process preExec from runtime arguments."""
43  if hasattr(runArgs, 'preExec') and runArgs.preExec and runArgs.preExec != 'NONE':
44  ConfigFlags = flags # noqa: F841
45  from AthenaConfiguration.Enums import HIMode,BeamType # noqa: F401
46  for cmd in runArgs.preExec:
47  exec(cmd)
48 
49 

◆ processPreInclude()

def python.TransformUtils.processPreInclude (   runArgs,
  flags 
)
Process preInclude from runtime arguments.

Definition at line 62 of file Tools/PyJobTransforms/python/TransformUtils.py.

62 def processPreInclude(runArgs, flags):
63  """Process preInclude from runtime arguments."""
64  if hasattr(runArgs, 'preInclude') and runArgs.preInclude and runArgs.preInclude != 'NONE':
65  for fragment in runArgs.preInclude:
66  executeFromFragment(fragment, flags)
67 
68 

◆ SortInput()

def python.TransformUtils.SortInput (   flags,
  cfg 
)
Method that should be post-included after a typicaly configured Athena job
It will read original input files, create a list of Events in memory, sort them and produce
an intermediate Event Collection file that Athena will read instead of the original inputs
Event information is read from EventInfoTags (stored by default in all Athena data files)
The default sort key value (Lumi) can be oveerriden, as the sorting order
The intermediate Collection file can be inspected using CollQuery cmdline utility

Definition at line 122 of file Tools/PyJobTransforms/python/TransformUtils.py.

122 def SortInput(flags, cfg):
123  """Method that should be post-included after a typicaly configured Athena job
124  It will read original input files, create a list of Events in memory, sort them and produce
125  an intermediate Event Collection file that Athena will read instead of the original inputs
126  Event information is read from EventInfoTags (stored by default in all Athena data files)
127  The default sort key value (Lumi) can be oveerriden, as the sorting order
128  The intermediate Collection file can be inspected using CollQuery cmdline utility"""
129  import os
130  inputs = cfg.getService("EventSelector").InputCollections
131 
132  # set default sort parameters, read overrides from locals()
133  tmpCollFile = locals().get("AthenaInputSortCollName", "sortedEventRefs" + str(os.getpid()) )
134  sortTag = locals().get("AthenaInputSortTag", "LumiBlockN")
135  sortOrd = locals().get("AthenaInputSortOrder", "Ascending")
136 
137  from CollectionUtilities.SortedCollectionCreator import SortedCollectionCreator
138  sorter = SortedCollectionCreator(name="SortEvents")
139  # Sort Inputs based on one of the EventInfoTag attributes
140  # Store sorted event collection in a temporary file
141  # This should run as postInclude, so we assume EventSelector.InputCollections is set earlier
142 
143  # moved execution to a subprocess, because Gaudi messaging created by collections causes
144  # AppManager errors
145  rc = sorter.executeInSubprocess(inputs, outputCollection=tmpCollFile, sortAttribute=sortTag, sortOrder=sortOrd)
146  if rc != 0:
147  msg.error(f"Sorting failed with exit code: {rc}")
148 
149  # Reading Events through References require a populated FileCatalog
150  for inpfile in inputs:
151  os.system('pool_insertFileToCatalog {}'.format(inpfile))
152 
153  # Tell Athena to use the sorted collection instead of the original inputs
154  cfg.getService("EventSelector").InputCollections = [tmpCollFile + ".root"]
155  cfg.getService("EventSelector").CollectionType = "RootCollection"
156  return cfg

◆ UseCREST()

def python.TransformUtils.UseCREST (   flags)
PreInclude to switch to using CREST rather than COOL

Definition at line 103 of file Tools/PyJobTransforms/python/TransformUtils.py.

103 def UseCREST(flags):
104  """PreInclude to switch to using CREST rather than COOL
105  """
106  flags.IOVDb.UseCREST = True
107  from os import environ
108  msg.info('Enabling CREST DB access')
109  if environ.get('CREST_SERVER'):
110  flags.IOVDb.CrestServer = environ.get('CREST_SERVER')
111  else:
112  msg.info('CREST_SERVER environment variable not defined - using fall-back.')
113  msg.info(f'Using CrestServer: {flags.IOVDb.CrestServer}')
114 
115 

◆ UseFrontier()

def python.TransformUtils.UseFrontier (   flags)
Switch database to using FRONTIER, but with a fallback
to DBRelease if FRONTIER_SERVER is undefined (e.g., on HPC)

Move from RecJobTransforms to PyJobTransforms to enable use
in simulation ATN and KV jobs

Definition at line 79 of file Tools/PyJobTransforms/python/TransformUtils.py.

79 def UseFrontier(flags):
80  """Switch database to using FRONTIER, but with a fallback
81  to DBRelease if FRONTIER_SERVER is undefined (e.g., on HPC)
82 
83  Move from RecJobTransforms to PyJobTransforms to enable use
84  in simulation ATN and KV jobs
85  """
86  from os import environ
87  from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
88 
89  if flags.IOVDb.UseCREST:
90  raise RuntimeError('Using PyJobTransforms.UseCREST (setting flags.IOVDb.UseCREST = True) and PyJobTransforms.UseFrontier in the same job is not supported!')
91 
92  cfg = ComponentAccumulator()
93  if environ.get('FRONTIER_SERVER'):
94  msg.info('Enabling FRONTIER DB access')
95  from IOVDbSvc.IOVDbSvcConfig import DBReplicaSvcCfg
96  cfg.merge(DBReplicaSvcCfg(flags, vetoDBRelease=True))
97  else:
98  msg.info('Using default DB access')
99 
100  return cfg
101 
102 

Variable Documentation

◆ msg

python.TransformUtils.msg
python.TransformUtils.processPreExec
def processPreExec(runArgs, flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:41
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:302
vtune_athena.format
format
Definition: vtune_athena.py:14
python.TransformUtils.processPostExec
def processPostExec(runArgs, flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:50
python.TransformUtils.DumpPickle
def DumpPickle(flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:116
python.TransformUtils.UseFrontier
def UseFrontier(flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:79
python.TransformUtils.processPostInclude
def processPostInclude(runArgs, flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:69
python.TransformUtils.processPreInclude
def processPreInclude(runArgs, flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:62
python.TransformUtils.ExecCondAlgsAtPreFork
def ExecCondAlgsAtPreFork(flags, cfg)
Definition: Control/AthenaServices/python/TransformUtils.py:5
python.TransformUtils.SortInput
def SortInput(flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:122
python.TransformUtils.UseCREST
def UseCREST(flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:103
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
print
void print(char *figname, TCanvas *c1)
Definition: TRTCalib_StrawStatusPlots.cxx:26
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
python.IOVDbSvcConfig.DBReplicaSvcCfg
def DBReplicaSvcCfg(flags, vetoDBRelease=False, **kwargs)
Definition: IOVDbSvcConfig.py:18
Trk::open
@ open
Definition: BinningType.h:40
python.TransformUtils.executeFromFragment
def executeFromFragment(fragment_string, flags, cfg=None)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:6
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition: hcg.cxx:127
str
Definition: BTagTrackIpAccessor.cxx:11