ATLAS Offline Software
Loading...
Searching...
No Matches
python.TransformUtils Namespace Reference

Functions

 ExecCondAlgsAtPreFork (flags, cfg)
 executeFromFragment (fragment_string, flags, cfg=None)
 processPreExec (runArgs, flags)
 processPostExec (runArgs, flags, cfg)
 processPreInclude (runArgs, flags)
 processPostInclude (runArgs, flags, cfg)
 UseFrontier (flags)
 UseCREST (flags)
 DumpPickle (flags, cfg)
 SortInput (flags, cfg)

Variables

 msg = logging.getLogger(__name__)

Function Documentation

◆ DumpPickle()

python.TransformUtils.DumpPickle ( flags,
cfg )
Dump the pickle file for the current configuration

Definition at line 116 of file Tools/PyJobTransforms/python/TransformUtils.py.

116def DumpPickle(flags, cfg):
117 """Dump the pickle file for the current configuration"""
118 with open("Configuration.pkl", "wb") as f:
119 cfg.store(f)
120
121

◆ ExecCondAlgsAtPreFork()

python.TransformUtils.ExecCondAlgsAtPreFork ( flags,
cfg )
 Execute the entire conditions sequence during the PreFork step.
This allows saving some memory in AthenaMP jobs, which is particularly
useful in the derivation production.

Definition at line 5 of file Control/AthenaServices/python/TransformUtils.py.

5def ExecCondAlgsAtPreFork(flags, cfg):
6 """ Execute the entire conditions sequence during the PreFork step.
7 This allows saving some memory in AthenaMP jobs, which is particularly
8 useful in the derivation production.
9 """
10 log = logging.getLogger('ExecCondAlgsAtPreFork')
11 try:
12 cfg.getService('AthMpEvtLoopMgr').ExecAtPreFork=['AthCondSeq']
13 log.info('AthCondSeq will be executed during PreFork')
14 except ConfigurationError: # in this context this might not be an error
15 log.info('AthCondSeq will NOT be executed during PreFork')

◆ executeFromFragment()

python.TransformUtils.executeFromFragment ( fragment_string,
flags,
cfg = None )
Execute a function from a pre/post include fragment.

Definition at line 6 of file Tools/PyJobTransforms/python/TransformUtils.py.

6def executeFromFragment(fragment_string, flags, cfg=None):
7 """Execute a function from a pre/post include fragment."""
8 # detect legacy job options in the transition period:
9 if fragment_string.endswith('.py'):
10 msg.warning(f'Trying to load legacy job options {fragment_string}. This should NOT be used in production!')
11 fragment_string = fragment_string[:-3]
12 fragment_string = fragment_string.replace('/', '.')
13 msg.warning(f'Resolved to {fragment_string}')
14
15 parts = fragment_string.split('.')
16 if len(parts) < 2:
17 raise ValueError('Pre/post include should be of the form Package.Module.Function or Package.Function if defined in __init__.py')
18
19 function = parts[-1]
20 module = '.'.join(parts[:-1])
21
22 from importlib import import_module
23 loaded_module = import_module(module)
24 function_def = getattr(loaded_module, function)
25
26 if not cfg:
27 function_def(flags)
28 return
29
30 from inspect import getfullargspec
31 argspec = getfullargspec(function_def)
32 print(argspec)
33 argCount = len(argspec.args)
34 defaultsCount = len(argspec.defaults) if argspec.defaults else 0
35 if argCount - defaultsCount == 1:
36 cfg.merge(function_def(flags))
37 else:
38 function_def(flags, cfg)
39
40
void print(char *figname, TCanvas *c1)

◆ processPostExec()

python.TransformUtils.processPostExec ( runArgs,
flags,
cfg )
Process postExec from runtime arguments.

Definition at line 50 of file Tools/PyJobTransforms/python/TransformUtils.py.

50def processPostExec(runArgs, flags, cfg):
51 """Process postExec from runtime arguments."""
52 if not flags.locked():
53 raise RuntimeError('Running a postExec before locking ConfigFlags')
54
55 if hasattr(runArgs, 'postExec') and runArgs.postExec and runArgs.postExec != 'NONE':
56 ConfigFlags = flags # noqa: F841
57 from AthenaConfiguration.ComponentFactory import CompFactory # noqa: F401
58 for cmd in runArgs.postExec:
59 exec(cmd)
60
61

◆ processPostInclude()

python.TransformUtils.processPostInclude ( runArgs,
flags,
cfg )
Process postInclude from runtime arguments.

Definition at line 69 of file Tools/PyJobTransforms/python/TransformUtils.py.

69def processPostInclude(runArgs, flags, cfg):
70 """Process postInclude from runtime arguments."""
71 if not flags.locked():
72 raise RuntimeError('Running a postInclude before locking ConfigFlags')
73
74 if hasattr(runArgs, 'postInclude') and runArgs.postInclude and runArgs.postInclude != 'NONE':
75 for fragment in runArgs.postInclude:
76 executeFromFragment(fragment, flags, cfg)
77
78

◆ processPreExec()

python.TransformUtils.processPreExec ( runArgs,
flags )
Process preExec from runtime arguments.

Definition at line 41 of file Tools/PyJobTransforms/python/TransformUtils.py.

41def processPreExec(runArgs, flags):
42 """Process preExec from runtime arguments."""
43 if hasattr(runArgs, 'preExec') and runArgs.preExec and runArgs.preExec != 'NONE':
44 ConfigFlags = flags # noqa: F841
45 from AthenaConfiguration.Enums import HIMode,BeamType # noqa: F401
46 for cmd in runArgs.preExec:
47 exec(cmd)
48
49

◆ processPreInclude()

python.TransformUtils.processPreInclude ( runArgs,
flags )
Process preInclude from runtime arguments.

Definition at line 62 of file Tools/PyJobTransforms/python/TransformUtils.py.

62def processPreInclude(runArgs, flags):
63 """Process preInclude from runtime arguments."""
64 if hasattr(runArgs, 'preInclude') and runArgs.preInclude and runArgs.preInclude != 'NONE':
65 for fragment in runArgs.preInclude:
66 executeFromFragment(fragment, flags)
67
68

◆ SortInput()

python.TransformUtils.SortInput ( flags,
cfg )
Method that should be post-included after a typicaly configured Athena job
It will read original input files, create a list of Events in memory, sort them and produce
an intermediate Event Collection file that Athena will read instead of the original inputs
Event information is read from EventInfoTags (stored by default in all Athena data files)
The default sort key value (Lumi) can be oveerriden, as the sorting order
The intermediate Collection file can be inspected using CollQuery cmdline utility

Definition at line 122 of file Tools/PyJobTransforms/python/TransformUtils.py.

122def SortInput(flags, cfg):
123 """Method that should be post-included after a typicaly configured Athena job
124 It will read original input files, create a list of Events in memory, sort them and produce
125 an intermediate Event Collection file that Athena will read instead of the original inputs
126 Event information is read from EventInfoTags (stored by default in all Athena data files)
127 The default sort key value (Lumi) can be oveerriden, as the sorting order
128 The intermediate Collection file can be inspected using CollQuery cmdline utility"""
129 import os
130 inputs = cfg.getService("EventSelector").InputCollections
131
132 # set default sort parameters, read overrides from locals()
133 tmpCollFile = locals().get("AthenaInputSortCollName", "sortedEventRefs" + str(os.getpid()) )
134 sortTag = locals().get("AthenaInputSortTag", "LumiBlockN")
135 sortOrd = locals().get("AthenaInputSortOrder", "Ascending")
136
137 from CollectionSvc.SortedCollectionCreator import SortedCollectionCreator
138 sorter = SortedCollectionCreator(name="SortEvents")
139 # Sort Inputs based on one of the EventInfoTag attributes
140 # Store sorted event collection in a temporary file
141 # This should run as postInclude, so we assume EventSelector.InputCollections is set earlier
142
143 # moved execution to a subprocess, because Gaudi messaging created by collections causes
144 # AppManager errors
145 rc = sorter.executeInSubprocess(inputs, outputCollection=tmpCollFile, sortAttribute=sortTag, sortOrder=sortOrd)
146 if rc != 0:
147 msg.error(f"Sorting failed with exit code: {rc}")
148
149 # Reading Events through References require a populated FileCatalog
150 for inpfile in inputs:
151 os.system('pool_insertFileToCatalog {}'.format(inpfile))
152
153 # Tell Athena to use the sorted collection instead of the original inputs
154 cfg.getService("EventSelector").InputCollections = [tmpCollFile + ".root"]
155 cfg.getService("EventSelector").CollectionType = "RootCollection"
156 return cfg
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition hcg.cxx:130

◆ UseCREST()

python.TransformUtils.UseCREST ( flags)
PreInclude to switch to using CREST rather than COOL

Definition at line 103 of file Tools/PyJobTransforms/python/TransformUtils.py.

103def UseCREST(flags):
104 """PreInclude to switch to using CREST rather than COOL
105 """
106 flags.IOVDb.UseCREST = True
107 from os import environ
108 msg.info('Enabling CREST DB access')
109 if environ.get('CREST_SERVER'):
110 flags.IOVDb.CrestServer = environ.get('CREST_SERVER')
111 else:
112 msg.info('CREST_SERVER environment variable not defined - using fall-back.')
113 msg.info(f'Using CrestServer: {flags.IOVDb.CrestServer}')
114
115

◆ UseFrontier()

python.TransformUtils.UseFrontier ( flags)
Switch database to using FRONTIER, but with a fallback
to DBRelease if FRONTIER_SERVER is undefined (e.g., on HPC)

Move from RecJobTransforms to PyJobTransforms to enable use
in simulation ATN and KV jobs

Definition at line 79 of file Tools/PyJobTransforms/python/TransformUtils.py.

79def UseFrontier(flags):
80 """Switch database to using FRONTIER, but with a fallback
81 to DBRelease if FRONTIER_SERVER is undefined (e.g., on HPC)
82
83 Move from RecJobTransforms to PyJobTransforms to enable use
84 in simulation ATN and KV jobs
85 """
86 from os import environ
87 from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
88
89 if flags.IOVDb.UseCREST:
90 raise RuntimeError('Using PyJobTransforms.UseCREST (setting flags.IOVDb.UseCREST = True) and PyJobTransforms.UseFrontier in the same job is not supported!')
91
92 cfg = ComponentAccumulator()
93 if environ.get('FRONTIER_SERVER'):
94 msg.info('Enabling FRONTIER DB access')
95 from IOVDbSvc.IOVDbSvcConfig import DBReplicaSvcCfg
96 cfg.merge(DBReplicaSvcCfg(flags, vetoDBRelease=True))
97 else:
98 msg.info('Using default DB access')
99
100 return cfg
101
102

Variable Documentation

◆ msg

python.TransformUtils.msg = logging.getLogger(__name__)