ATLAS Offline Software
Loading...
Searching...
No Matches
python.TransformUtils Namespace Reference

Functions

 ExecCondAlgsAtPreFork (flags, cfg)
 executeFromFragment (fragment_string, flags, cfg=None)
 processPreExec (runArgs, flags)
 processPostExec (runArgs, flags, cfg)
 processPreInclude (runArgs, flags)
 processPostInclude (runArgs, flags, cfg)
 UseFrontier (flags)
 UseCREST (flags)
 DumpPickle (flags, cfg)
 SortInput (flags, cfg)

Variables

 msg = logging.getLogger(__name__)

Function Documentation

◆ DumpPickle()

python.TransformUtils.DumpPickle ( flags,
cfg )
Dump the pickle file for the current configuration

Definition at line 119 of file Tools/PyJobTransforms/python/TransformUtils.py.

119def DumpPickle(flags, cfg):
120 """Dump the pickle file for the current configuration"""
121 with open("Configuration.pkl", "wb") as f:
122 cfg.store(f)
123
124

◆ ExecCondAlgsAtPreFork()

python.TransformUtils.ExecCondAlgsAtPreFork ( flags,
cfg )
 Execute the entire conditions sequence during the PreFork step.
This allows saving some memory in AthenaMP jobs, which is particularly
useful in the derivation production.

Definition at line 5 of file Control/AthenaServices/python/TransformUtils.py.

5def ExecCondAlgsAtPreFork(flags, cfg):
6 """ Execute the entire conditions sequence during the PreFork step.
7 This allows saving some memory in AthenaMP jobs, which is particularly
8 useful in the derivation production.
9 """
10 log = logging.getLogger('ExecCondAlgsAtPreFork')
11 try:
12 cfg.getService('AthMpEvtLoopMgr').ExecAtPreFork=['AthCondSeq']
13 log.info('AthCondSeq will be executed during PreFork')
14 except ConfigurationError: # in this context this might not be an error
15 log.info('AthCondSeq will NOT be executed during PreFork')

◆ executeFromFragment()

python.TransformUtils.executeFromFragment ( fragment_string,
flags,
cfg = None )
Execute a function from a pre/post include fragment.

Definition at line 6 of file Tools/PyJobTransforms/python/TransformUtils.py.

6def executeFromFragment(fragment_string, flags, cfg=None):
7 """Execute a function from a pre/post include fragment."""
8 # detect legacy job options in the transition period:
9 if fragment_string.endswith('.py'):
10 msg.warning(f'Trying to load legacy job options {fragment_string}. This should NOT be used in production!')
11 fragment_string = fragment_string[:-3]
12 fragment_string = fragment_string.replace('/', '.')
13 msg.warning(f'Resolved to {fragment_string}')
14
15 parts = fragment_string.split('.')
16 if len(parts) < 2:
17 raise ValueError('Pre/post include should be of the form Package.Module.Function or Package.Function if defined in __init__.py')
18
19 function = parts[-1]
20 module = '.'.join(parts[:-1])
21
22 from importlib import import_module
23 loaded_module = import_module(module)
24 function_def = getattr(loaded_module, function)
25
26 if not cfg:
27 function_def(flags)
28 return
29
30 from inspect import getfullargspec
31 argspec = getfullargspec(function_def)
32 print(argspec)
33 argCount = len(argspec.args)
34 defaultsCount = len(argspec.defaults) if argspec.defaults else 0
35 if argCount - defaultsCount == 1:
36 cfg.merge(function_def(flags))
37 else:
38 function_def(flags, cfg)
39
40
void print(char *figname, TCanvas *c1)

◆ processPostExec()

python.TransformUtils.processPostExec ( runArgs,
flags,
cfg )
Process postExec from runtime arguments.

Definition at line 50 of file Tools/PyJobTransforms/python/TransformUtils.py.

50def processPostExec(runArgs, flags, cfg):
51 """Process postExec from runtime arguments."""
52 if not flags.locked():
53 raise RuntimeError('Running a postExec before locking ConfigFlags')
54
55 if hasattr(runArgs, 'postExec') and runArgs.postExec and runArgs.postExec != 'NONE':
56 ConfigFlags = flags # noqa: F841
57 from AthenaConfiguration.ComponentFactory import CompFactory # noqa: F401
58 for cmd in runArgs.postExec:
59 exec(cmd)
60
61

◆ processPostInclude()

python.TransformUtils.processPostInclude ( runArgs,
flags,
cfg )
Process postInclude from runtime arguments.

Definition at line 69 of file Tools/PyJobTransforms/python/TransformUtils.py.

69def processPostInclude(runArgs, flags, cfg):
70 """Process postInclude from runtime arguments."""
71 if not flags.locked():
72 raise RuntimeError('Running a postInclude before locking ConfigFlags')
73
74 if hasattr(runArgs, 'postInclude') and runArgs.postInclude and runArgs.postInclude != 'NONE':
75 for fragment in runArgs.postInclude:
76 executeFromFragment(fragment, flags, cfg)
77
78

◆ processPreExec()

python.TransformUtils.processPreExec ( runArgs,
flags )
Process preExec from runtime arguments.

Definition at line 41 of file Tools/PyJobTransforms/python/TransformUtils.py.

41def processPreExec(runArgs, flags):
42 """Process preExec from runtime arguments."""
43 if hasattr(runArgs, 'preExec') and runArgs.preExec and runArgs.preExec != 'NONE':
44 ConfigFlags = flags # noqa: F841
45 from AthenaConfiguration.Enums import HIMode,BeamType # noqa: F401
46 for cmd in runArgs.preExec:
47 exec(cmd)
48
49

◆ processPreInclude()

python.TransformUtils.processPreInclude ( runArgs,
flags )
Process preInclude from runtime arguments.

Definition at line 62 of file Tools/PyJobTransforms/python/TransformUtils.py.

62def processPreInclude(runArgs, flags):
63 """Process preInclude from runtime arguments."""
64 if hasattr(runArgs, 'preInclude') and runArgs.preInclude and runArgs.preInclude != 'NONE':
65 for fragment in runArgs.preInclude:
66 executeFromFragment(fragment, flags)
67
68

◆ SortInput()

python.TransformUtils.SortInput ( flags,
cfg )
Method that should be post-included after a typicaly configured Athena job
It will read original input files, create a list of Events in memory, sort them and produce
an intermediate Event Collection file that Athena will read instead of the original inputs
Event information is read from EventInfoTags (stored by default in all Athena data files)
The default sort key value (Lumi) can be oveerriden, as the sorting order
The intermediate Collection file can be inspected using CollQuery cmdline utility

Definition at line 125 of file Tools/PyJobTransforms/python/TransformUtils.py.

125def SortInput(flags, cfg):
126 """Method that should be post-included after a typicaly configured Athena job
127 It will read original input files, create a list of Events in memory, sort them and produce
128 an intermediate Event Collection file that Athena will read instead of the original inputs
129 Event information is read from EventInfoTags (stored by default in all Athena data files)
130 The default sort key value (Lumi) can be oveerriden, as the sorting order
131 The intermediate Collection file can be inspected using CollQuery cmdline utility"""
132 import os
133 inputs = cfg.getService("EventSelector").InputCollections
134
135 # set default sort parameters, read overrides from locals()
136 tmpCollFile = locals().get("AthenaInputSortCollName", "sortedEventRefs" + str(os.getpid()) + ".root")
137 sortTag = locals().get("AthenaInputSortTag", "LumiBlockN")
138 sortOrd = locals().get("AthenaInputSortOrder", "Ascending")
139
140 from CollectionSvc.SortedCollectionCreator import SortedCollectionCreator
141 sorter = SortedCollectionCreator(name="SortEvents")
142 # Sort Inputs based on one of the EventInfoTag attributes
143 # Store sorted event collection in a temporary file
144 # This should run as postInclude, so we assume EventSelector.InputCollections is set earlier
145
146 # moved execution to a subprocess, because Gaudi messaging created by collections causes
147 # AppManager errors
148 from PyUtils import PoolFile
149 rc = sorter.executeInSubprocess(inputs, outputCollection=tmpCollFile, outputCollectionType=PoolFile.PoolOpts.CollectionType.RootTTreeCollection, sortAttribute=sortTag, sortOrder=sortOrd)
150 if rc != 0:
151 msg.error(f"Sorting failed with exit code: {rc}")
152
153 # Reading Events through References require a populated FileCatalog
154 for inpfile in inputs:
155 os.system('pool_insertFileToCatalog {}'.format(inpfile))
156
157 cfg.getService("PoolSvc").AttemptCatalogPatch = False
158
159 # Tell Athena to use the sorted collection instead of the original inputs
160 cfg.getService("EventSelector").InputCollections = [tmpCollFile]
161 cfg.getService("EventSelector").CollectionType = "RootCollection"
162 return cfg
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition hcg.cxx:132

◆ UseCREST()

python.TransformUtils.UseCREST ( flags)
PreInclude to switch to using CREST rather than COOL

Definition at line 105 of file Tools/PyJobTransforms/python/TransformUtils.py.

105def UseCREST(flags):
106 """PreInclude to switch to using CREST rather than COOL
107 """
108 msg.warning('CREST is now used by default for Run4 and beyond, Please remove this PreInclude from commands!')
109 flags.IOVDb.UseCREST = True
110 from os import environ
111 msg.info('Enabling CREST DB access')
112 if environ.get('CREST_SERVER'):
113 flags.IOVDb.CrestServer = environ.get('CREST_SERVER')
114 else:
115 msg.info('CREST_SERVER environment variable not defined - using fall-back.')
116 msg.info(f'Using CrestServer: {flags.IOVDb.CrestServer}')
117
118

◆ UseFrontier()

python.TransformUtils.UseFrontier ( flags)
Switch database to using FRONTIER, but with a fallback
to DBRelease if FRONTIER_SERVER is undefined (e.g., on HPC)

Move from RecJobTransforms to PyJobTransforms to enable use
in simulation ATN and KV jobs

Definition at line 79 of file Tools/PyJobTransforms/python/TransformUtils.py.

79def UseFrontier(flags):
80 """Switch database to using FRONTIER, but with a fallback
81 to DBRelease if FRONTIER_SERVER is undefined (e.g., on HPC)
82
83 Move from RecJobTransforms to PyJobTransforms to enable use
84 in simulation ATN and KV jobs
85 """
86 from os import environ
87 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
88
89 cfg = ComponentAccumulator()
90
91 if flags.IOVDb.UseCREST:
92 msg.warning('Using PyJobTransforms.UseCREST (setting flags.IOVDb.UseCREST = True) and PyJobTransforms.UseFrontier in the same job is not supported!')
93 return cfg
94
95 if environ.get('FRONTIER_SERVER'):
96 msg.info('Enabling FRONTIER DB access')
97 from IOVDbSvc.IOVDbSvcConfig import DBReplicaSvcCfg
98 cfg.merge(DBReplicaSvcCfg(flags, vetoDBRelease=True))
99 else:
100 msg.info('Using default DB access')
101
102 return cfg
103
104

Variable Documentation

◆ msg

python.TransformUtils.msg = logging.getLogger(__name__)