4 msg = logging.getLogger(__name__)
7 """Execute a function from a pre/post include fragment."""
9 if fragment_string.endswith(
'.py'):
10 msg.warning(f
'Trying to load legacy job options {fragment_string}. This should NOT be used in production!')
11 fragment_string = fragment_string[:-3]
12 fragment_string = fragment_string.replace(
'/',
'.')
13 msg.warning(f
'Resolved to {fragment_string}')
15 parts = fragment_string.split(
'.')
17 raise ValueError(
'Pre/post include should be of the form Package.Module.Function or Package.Function if defined in __init__.py')
20 module =
'.'.
join(parts[:-1])
22 from importlib
import import_module
23 loaded_module = import_module(module)
24 function_def = getattr(loaded_module, function)
30 from inspect
import getfullargspec
31 argspec = getfullargspec(function_def)
33 argCount = len(argspec.args)
34 defaultsCount = len(argspec.defaults)
if argspec.defaults
else 0
35 if argCount - defaultsCount == 1:
36 cfg.merge(function_def(flags))
38 function_def(flags, cfg)
42 """Process preExec from runtime arguments."""
43 if hasattr(runArgs,
'preExec')
and runArgs.preExec
and runArgs.preExec !=
'NONE':
45 from AthenaConfiguration.Enums
import HIMode,BeamType
46 for cmd
in runArgs.preExec:
51 """Process postExec from runtime arguments."""
52 if not flags.locked():
53 raise RuntimeError(
'Running a postExec before locking ConfigFlags')
55 if hasattr(runArgs,
'postExec')
and runArgs.postExec
and runArgs.postExec !=
'NONE':
57 from AthenaConfiguration.ComponentFactory
import CompFactory
58 for cmd
in runArgs.postExec:
63 """Process preInclude from runtime arguments."""
64 if hasattr(runArgs,
'preInclude')
and runArgs.preInclude
and runArgs.preInclude !=
'NONE':
65 for fragment
in runArgs.preInclude:
70 """Process postInclude from runtime arguments."""
71 if not flags.locked():
72 raise RuntimeError(
'Running a postInclude before locking ConfigFlags')
74 if hasattr(runArgs,
'postInclude')
and runArgs.postInclude
and runArgs.postInclude !=
'NONE':
75 for fragment
in runArgs.postInclude:
80 """Switch database to using FRONTIER, but with a fallback
81 to DBRelease if FRONTIER_SERVER is undefined (e.g., on HPC)
83 Move from RecJobTransforms to PyJobTransforms to enable use
84 in simulation ATN and KV jobs
86 from os
import environ
87 from AthenaConfiguration.ComponentAccumulator
import ComponentAccumulator
88 from AthenaConfiguration.ComponentFactory
import CompFactory
91 if environ.get(
'FRONTIER_SERVER'):
92 msg.info(
'Enabling FRONTIER DB access')
93 cfg.addService(CompFactory.DBReplicaSvc(COOLSQLiteVetoPattern=
'DBRelease'))
95 msg.info(
'Using default DB access')
101 """Dump the pickle file for the current configuration"""
102 with open(
"Configuration.pkl",
"wb")
as f:
107 """Method that should be post-included after a typicaly configured Athena job
108 It will read original input files, create a list of Events in memory, sort them and produce
109 an intermediate Event Collection file that Athena will read instead of the original inputs
110 Event information is read from EventInfoTags (stored by default in all Athena data files)
111 The default sort key value (Lumi) can be oveerriden, as the sorting order
112 The intermediate Collection file can be inspected using CollQuery cmdline utility"""
114 inputs = cfg.getService(
"EventSelector").InputCollections
117 tmpCollFile = locals().
get(
"AthenaInputSortCollName",
"sortedEventRefs" +
str(os.getpid()) )
118 sortTag = locals().
get(
"AthenaInputSortTag",
"LumiBlockN")
119 sortOrd = locals().
get(
"AthenaInputSortOrder",
"Ascending")
121 from CollectionUtilities.SortedCollectionCreator
import SortedCollectionCreator
122 sorter = SortedCollectionCreator(name=
"SortEvents")
126 sorter.execute(inputs, outputCollection=tmpCollFile, sortAttribute=sortTag, sortOrder=sortOrd)
129 for inpfile
in inputs:
130 os.system(
'pool_insertFileToCatalog {}'.
format(inpfile))
133 cfg.getService(
"EventSelector").InputCollections = [tmpCollFile +
".root"]
134 cfg.getService(
"EventSelector").CollectionType =
"ExplicitROOT"