ATLAS Offline Software
Tools/PyJobTransforms/python/TransformUtils.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 
3 import logging
4 msg = logging.getLogger(__name__)
5 
6 def executeFromFragment(fragment_string, flags, cfg=None):
7  """Execute a function from a pre/post include fragment."""
8  # detect legacy job options in the transition period:
9  if fragment_string.endswith('.py'):
10  msg.warning(f'Trying to load legacy job options {fragment_string}. This should NOT be used in production!')
11  fragment_string = fragment_string[:-3]
12  fragment_string = fragment_string.replace('/', '.')
13  msg.warning(f'Resolved to {fragment_string}')
14 
15  parts = fragment_string.split('.')
16  if len(parts) < 2:
17  raise ValueError('Pre/post include should be of the form Package.Module.Function or Package.Function if defined in __init__.py')
18 
19  function = parts[-1]
20  module = '.'.join(parts[:-1])
21 
22  from importlib import import_module
23  loaded_module = import_module(module)
24  function_def = getattr(loaded_module, function)
25 
26  if not cfg:
27  function_def(flags)
28  return
29 
30  from inspect import getfullargspec
31  argspec = getfullargspec(function_def)
32  print(argspec)
33  argCount = len(argspec.args)
34  defaultsCount = len(argspec.defaults) if argspec.defaults else 0
35  if argCount - defaultsCount == 1:
36  cfg.merge(function_def(flags))
37  else:
38  function_def(flags, cfg)
39 
40 
41 def processPreExec(runArgs, flags):
42  """Process preExec from runtime arguments."""
43  if hasattr(runArgs, 'preExec') and runArgs.preExec and runArgs.preExec != 'NONE':
44  ConfigFlags = flags # noqa: F841
45  from AthenaConfiguration.Enums import HIMode,BeamType # noqa: F401
46  for cmd in runArgs.preExec:
47  exec(cmd)
48 
49 
50 def processPostExec(runArgs, flags, cfg):
51  """Process postExec from runtime arguments."""
52  if not flags.locked():
53  raise RuntimeError('Running a postExec before locking ConfigFlags')
54 
55  if hasattr(runArgs, 'postExec') and runArgs.postExec and runArgs.postExec != 'NONE':
56  ConfigFlags = flags # noqa: F841
57  from AthenaConfiguration.ComponentFactory import CompFactory # noqa: F401
58  for cmd in runArgs.postExec:
59  exec(cmd)
60 
61 
62 def processPreInclude(runArgs, flags):
63  """Process preInclude from runtime arguments."""
64  if hasattr(runArgs, 'preInclude') and runArgs.preInclude and runArgs.preInclude != 'NONE':
65  for fragment in runArgs.preInclude:
66  executeFromFragment(fragment, flags)
67 
68 
69 def processPostInclude(runArgs, flags, cfg):
70  """Process postInclude from runtime arguments."""
71  if not flags.locked():
72  raise RuntimeError('Running a postInclude before locking ConfigFlags')
73 
74  if hasattr(runArgs, 'postInclude') and runArgs.postInclude and runArgs.postInclude != 'NONE':
75  for fragment in runArgs.postInclude:
76  executeFromFragment(fragment, flags, cfg)
77 
78 
79 def UseFrontier(flags):
80  """Switch database to using FRONTIER, but with a fallback
81  to DBRelease if FRONTIER_SERVER is undefined (e.g., on HPC)
82 
83  Move from RecJobTransforms to PyJobTransforms to enable use
84  in simulation ATN and KV jobs
85  """
86  from os import environ
87  from AthenaConfiguration.ComponentAccumulator import ComponentAccumulator
88  from AthenaConfiguration.ComponentFactory import CompFactory
89 
90  cfg = ComponentAccumulator()
91  if environ.get('FRONTIER_SERVER'):
92  msg.info('Enabling FRONTIER DB access')
93  cfg.addService(CompFactory.DBReplicaSvc(COOLSQLiteVetoPattern='DBRelease'))
94  else:
95  msg.info('Using default DB access')
96 
97  return cfg
98 
99 
100 def DumpPickle(flags, cfg):
101  """Dump the pickle file for the current configuration"""
102  with open("Configuration.pkl", "wb") as f:
103  cfg.store(f)
104 
105 
106 def SortInput(flags, cfg):
107  """Method that should be post-included after a typicaly configured Athena job
108  It will read original input files, create a list of Events in memory, sort them and produce
109  an intermediate Event Collection file that Athena will read instead of the original inputs
110  Event information is read from EventInfoTags (stored by default in all Athena data files)
111  The default sort key value (Lumi) can be oveerriden, as the sorting order
112  The intermediate Collection file can be inspected using CollQuery cmdline utility"""
113  import os
114  inputs = cfg.getService("EventSelector").InputCollections
115 
116  # set default sort parameters, read overrides from locals()
117  tmpCollFile = locals().get("AthenaInputSortCollName", "sortedEventRefs" + str(os.getpid()) )
118  sortTag = locals().get("AthenaInputSortTag", "LumiBlockN")
119  sortOrd = locals().get("AthenaInputSortOrder", "Ascending")
120 
121  from CollectionUtilities.SortedCollectionCreator import SortedCollectionCreator
122  sorter = SortedCollectionCreator(name="SortEvents")
123  # Sort Inputs based on one of the EventInfoTag attributes
124  # Store sorted event collection in a temporary file
125  # This should run as postInclude, so we assume EventSelector.InputCollections is set earlier
126  sorter.execute(inputs, outputCollection=tmpCollFile, sortAttribute=sortTag, sortOrder=sortOrd)
127 
128  # Reading Events through References require a populated FileCatalog
129  for inpfile in inputs:
130  os.system('pool_insertFileToCatalog {}'.format(inpfile))
131 
132  # Tell Athena to use the sorted collection instead of the original inputs
133  cfg.getService("EventSelector").InputCollections = [tmpCollFile + ".root"]
134  cfg.getService("EventSelector").CollectionType = "ExplicitROOT"
135  return cfg
python.TransformUtils.processPreExec
def processPreExec(runArgs, flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:41
python.JetAnalysisCommon.ComponentAccumulator
ComponentAccumulator
Definition: JetAnalysisCommon.py:302
vtune_athena.format
format
Definition: vtune_athena.py:14
python.TransformUtils.processPostExec
def processPostExec(runArgs, flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:50
python.TransformUtils.DumpPickle
def DumpPickle(flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:100
python.TransformUtils.UseFrontier
def UseFrontier(flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:79
python.TransformUtils.processPostInclude
def processPostInclude(runArgs, flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:69
python.TransformUtils.processPreInclude
def processPreInclude(runArgs, flags)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:62
python.TransformUtils.SortInput
def SortInput(flags, cfg)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:106
LArG4FSStartPointFilter.exec
exec
Definition: LArG4FSStartPointFilter.py:103
TCS::join
std::string join(const std::vector< std::string > &v, const char c=',')
Definition: Trigger/TrigT1/L1Topo/L1TopoCommon/Root/StringUtils.cxx:10
Trk::open
@ open
Definition: BinningType.h:40
python.TransformUtils.executeFromFragment
def executeFromFragment(fragment_string, flags, cfg=None)
Definition: Tools/PyJobTransforms/python/TransformUtils.py:6
Muon::print
std::string print(const MuPatSegment &)
Definition: MuonTrackSteering.cxx:28
get
T * get(TKey *tobj)
get a TObject* from a TKey* (why can't a TObject be a TKey?)
Definition: hcg.cxx:127
str
Definition: BTagTrackIpAccessor.cxx:11