ATLAS Offline Software
trfJobOptions.py
Go to the documentation of this file.
1 # Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2 
3 
9 
10 import os
11 import time
12 
13 import logging
14 msg = logging.getLogger(__name__)
15 
16 import PyJobTransforms.trfArgClasses as trfArgClasses
17 import PyJobTransforms.trfExceptions as trfExceptions
18 from PyJobTransforms.trfExeStepTools import getExecutorStepEventCounts
19 from PyJobTransforms.trfExitCodes import trfExit
20 
21 from PyJobTransforms.trfUtils import findFile
22 
23 
24 
27  """Dynamic class that holds the run arguments as named members with values."""
28  def __str__(self):
29  myself = 'RunArguments:'
30  for arg in dir(self):
31  if not arg.startswith('__'):
32  myself += '%s %s = %s' % (os.linesep, arg, repr(getattr(self, arg)))
33  return myself
34 
35 
36 
39  """For generating runArgs JobOptions file """
40 
41 
46  def __init__(self, exe, version=None, runArgsName='runArgs'):
47  self._exe = exe
48  self._version = version
49  self._runArgsName = runArgsName
50  self._runArgsFile = 'runargs.' + self._exe.name + '.py'
51 
52 
53  def consistencyCheck(self):
54  if self._exe._isCAEnabled():
55  if self._exe._skeletonCA is None:
56  errMsg = "ComponentAccumulator requested but this transform does not supply a ComponentAccumulator-based skeleton file"
57  msg.error(errMsg)
58  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_RUNARGS_ERROR'),errMsg)
59  else: # not self._exe._isCAEnabled():
60  if self._exe._skeleton is None:
61  errMsg = "Legacy configuration requested but this transform does not supply a legacy skeleton file"
62  msg.error(errMsg)
63  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_RUNARGS_ERROR'),errMsg)
64 
65 
66  def writeRunArgs(self, input = dict(), output = dict()):
67  msg.info('Writing runArgs to file \"%s\"', self._runArgsFile)
68 
69  with open(self._runArgsFile, 'w') as runargsFile:
70  try:
71  if self._exe._isCAEnabled():
72  # Write a shebang to identify CA files.
73  print("#!/usr/bin/env athena.py", file=runargsFile)
74  # First write a little header
75  print(os.linesep.join(("# Run arguments file auto-generated on {0} by:".format(time.asctime()),
76  "# JobTransform: {0}".format(self._exe.name),
77  "# Version: {0}".format(self._version)
78  )), file=runargsFile)
79 
80  # Now make sure we import the runArgs class for out job options
81  print(os.linesep.join(("# Import runArgs class",
82  "from PyJobTransforms.trfJobOptions import RunArguments",
83  "{0} = RunArguments()".format(self._runArgsName)
84  )), file=runargsFile)
85 
86  # Handy to write the substep name here as it can be used as (part of) a random seed
87  # in some cases
88  print('{0}.trfSubstepName = {1!r}'.format(self._runArgsName, self._exe.name), os.linesep, file=runargsFile)
89 
90  # Now loop over the core argdict and see what needs to be given as a runArg
91  declaredRunargs = []
92  for k, v in self._exe.conf.argdict.items():
93  # Check if this arg is supposed to be in runArgs
94  if isinstance(v, trfArgClasses.argument) and v.isRunarg:
95  # Files handled later
96  if isinstance(v, trfArgClasses.argFile):
97  continue
98 
99  msg.debug('Argument {0} is a runarg, will be added to JO file (value {1})'.format(k, v.value))
100 
101 
104  if isinstance(v, trfArgClasses.argSubstep):
105  myValue = v.returnMyValue(exe = self._exe)
106  if myValue is not None:
107  print("{0}.{1!s} = {2!r}".format(self._runArgsName, k, myValue), file=runargsFile)
108  msg.debug('Added substep type argument {0} as: {1}'.format(k, myValue))
109  declaredRunargs.append(k)
110  else:
111  print("{0}.{1!s} = {2!r}".format(self._runArgsName, k, v.value), file=runargsFile)
112  declaredRunargs.append(k)
113  else:
114  msg.debug('Argument {0} is not a runarg - ignored'.format(k))
115 
116  # Now make sure that if we did not add maxEvents then we set this to -1, which
117  # avoids some strange defaults that only allow 5 events to be processed
118  if 'maxEvents' not in declaredRunargs:
119  print(os.linesep.join(("", "# Explicitly added to process all events in this step",
120  "{0}.maxEvents = -1".format(self._runArgsName),
121  )), file=runargsFile)
122 
123  # Now deal with our input and output files
124  print(os.linesep, "# Input data", file=runargsFile)
125  for dataType, dataArg in input.items():
126  if isinstance(dataArg, list) and dataArg:
127  dataArgStep = dataArg[self._exe.conf.executorStep]
128  print('{0}.input{1}File = {2!r}'.format(self._runArgsName, dataType, dataArgStep.value), file=runargsFile)
129  print('{0}.input{1}FileType = {2!r}'.format(self._runArgsName, dataType, dataArgStep.type), file=runargsFile)
130  # Add the input event count, if we know it
131  if dataArgStep.isCached(metadataKeys = ['nentries']):
132  print('{0}.input{1}FileNentries = {2!r}'.format(self._runArgsName, dataType, dataArgStep.nentries), file=runargsFile)
133  print("{0}.{1}FileIO = {2!r}".format(self._runArgsName, dataType, dataArgStep.io), file=runargsFile)
134  else:
135  print('{0}.input{1}File = {2!r}'.format(self._runArgsName, dataType, dataArg.value), file=runargsFile)
136  print('{0}.input{1}FileType = {2!r}'.format(self._runArgsName, dataType, dataArg.type), file=runargsFile)
137  # Add the input event count, if we know it
138  if dataArg.isCached(metadataKeys = ['nentries']):
139  print('{0}.input{1}FileNentries = {2!r}'.format(self._runArgsName, dataType, dataArg.nentries), file=runargsFile)
140  print("{0}.{1}FileIO = {2!r}".format(self._runArgsName, dataType, dataArg.io), file=runargsFile)
141 
142  print(os.linesep, "# Output data", file=runargsFile)
143  for dataType, dataArg in output.items():
144  # Need to be careful to convert _output_ filename as a strings, not a list
145  print('{0}.output{1}File = {2!r}'.format(self._runArgsName, dataType, dataArg.value[0]), file=runargsFile)
146  print('{0}.output{1}FileType = {2!r}'.format(self._runArgsName, dataType, dataArg.type), file=runargsFile)
147 
148 
149  # Process all of the tweaky special runtime arguments
150  print(os.linesep, "# Extra runargs", file=runargsFile)
151 
152  for k, v in self._exe._extraRunargs.items():
153 
155  if k in declaredRunargs:
156  if isinstance(self._exe.conf.argdict[k].value, list):
157  msg.debug('Extending runarg {0!s}={1!r}'.format(k, v))
158  print('{0}.{1!s}.extend({2!r})'.format(self._runArgsName, k, v), file=runargsFile)
159  else:
160  msg.debug('Adding runarg {0!s}={1!r}'.format(k, v))
161  print('{0}.{1!s} = {2!r}'.format(self._runArgsName, k, v), file=runargsFile)
162 
163 
164  print(os.linesep, '# Extra runtime runargs', file=runargsFile)
165  for k, v in self._exe._runtimeRunargs.items():
166  # These options are string converted, not repred, so they can write an option
167  # which is evaluated at runtime
168  # Protect this with try: except: for the Embedding use case
169  msg.debug('Adding runarg {0!s}={1!r}'.format(k, v))
170  if self._exe._isCAEnabled():
171  print(os.linesep.join(('try:',
172  ' {0}.{1!s} = {2!s}'.format(self._runArgsName, k, v),
173  'except AttributeError:',
174  ' print ("WARNING - AttributeError for {0}")'.format(k))), file=runargsFile)
175  else:
176  print(os.linesep.join(('try:',
177  ' {0}.{1!s} = {2!s}'.format(self._runArgsName, k, v),
178  'except AttributeError:',
179  ' print ("WARNING - AttributeError for {0}")'.format(k))), file=runargsFile)
180 
181  if self._exe._literalRunargs is not None:
182  print(os.linesep, '# Literal runargs snippets', file=runargsFile)
183  for line in self._exe._literalRunargs:
184  print(line, file=runargsFile)
185 
186 
187  for dataType in self._exe._dataArgs:
188  print(os.linesep, '# Forced data value arguments', file=runargsFile)
189  if dataType in self._exe.conf.dataDictionary:
190  print('{0}.data{1}arg = {2!r}'.format(self._runArgsName, dataType,
191  self._exe.conf.dataDictionary[dataType].value), file=runargsFile)
192  else:
193  print('# Warning: data type "{0}" is not part of this transform'.format(dataType), file=runargsFile)
194 
195  # This adds the correct JO fragment for AthenaMP job, where we need to ask
196  # the FileMgr to produce the requested log and report files
197  # Also, aggregating the workers' logfiles into the mother's makes life
198  # easier for debugging
199  if self._exe._athenaMP:
200  print(os.linesep, '# AthenaMP Options. nprocs = %d' % self._exe._athenaMP, file=runargsFile)
201  # Proxy for both options
202  if self._exe._isCAEnabled():
203  # do not edit flags directly in this case
204  print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPWorkerTopDir', self._exe._athenaMPWorkerTopDir), file=runargsFile)
205  print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPOutputReportFile', self._exe._athenaMPFileReport), file=runargsFile)
206  print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPEventOrdersFile', self._exe._athenaMPEventOrdersFile), file=runargsFile)
207  print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPCollectSubprocessLogs', True), file=runargsFile)
208  else:
209  print(os.linesep.join((os.linesep,
210  'from AthenaMP.AthenaMPFlags import jobproperties as AthenaMPJobProps',
211  'AthenaMPJobProps.AthenaMPFlags.WorkerTopDir="{0}"'.format(self._exe._athenaMPWorkerTopDir),
212  'AthenaMPJobProps.AthenaMPFlags.OutputReportFile="{0}"'.format(self._exe._athenaMPFileReport),
213  'AthenaMPJobProps.AthenaMPFlags.EventOrdersFile="{0}"'.format(self._exe._athenaMPEventOrdersFile),
214  'AthenaMPJobProps.AthenaMPFlags.CollectSubprocessLogs=True'
215  )), file=runargsFile)
216  if self._exe._athenaMPStrategy:
217  if self._exe._isCAEnabled():
218  print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPStrategy', self._exe._athenaMPStrategy), file=runargsFile)
219  else:
220  # Beware of clobbering a non default value (a feature used by EventService)
221  print('if AthenaMPJobProps.AthenaMPFlags.Strategy.isDefault():', file=runargsFile)
222  print('\tAthenaMPJobProps.AthenaMPFlags.Strategy="{0}"'.format(self._exe._athenaMPStrategy), file=runargsFile)
223  if self._exe._athenaMPReadEventOrders:
224  if os.path.isfile(self._exe._athenaMPEventOrdersFile):
225  if self._exe._isCAEnabled():
226  print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPReadEventOrders', True), file=runargsFile)
227  else:
228  print('AthenaMPJobProps.AthenaMPFlags.ReadEventOrders=True', file=runargsFile)
229  else:
230  raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_EXEC_RUNARGS_ERROR"), "Failed to find file: {0} required by athenaMP option: --athenaMPUseEventOrders true".format(self._exe._athenaMPEventOrdersFile))
231  if 'athenaMPEventsBeforeFork' in self._exe.conf.argdict:
232  if self._exe._isCAEnabled():
233  print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPEventsBeforeFork', self._exe.conf.argdict['athenaMPEventsBeforeFork'].value), file=runargsFile)
234  else:
235  print('AthenaMPJobProps.AthenaMPFlags.EventsBeforeFork={0}'.format(self._exe.conf.argdict['athenaMPEventsBeforeFork'].value), file=runargsFile)
236  if 'sharedWriter' in self._exe.conf.argdict:
237  if self._exe._isCAEnabled():
238  print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'sharedWriter', self._exe.conf.argdict['sharedWriter'].value), file=runargsFile)
239  else:
240  print(f"AthenaMPJobProps.AthenaMPFlags.UseSharedWriter={self._exe.conf.argdict['sharedWriter'].value}", file=runargsFile)
241  if 'parallelCompression' in self._exe.conf.argdict:
242  if self._exe._isCAEnabled():
243  print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'parallelCompression', self._exe.conf.argdict['parallelCompression'].value), file=runargsFile)
244  else:
245  print(f"AthenaMPJobProps.AthenaMPFlags.UseParallelCompression={self._exe.conf.argdict['parallelCompression'].value}", file=runargsFile)
246 
247  # Executor substeps
248  print(os.linesep, '# Executor flags', file=runargsFile)
249  msg.debug('Adding runarg {0!s}={1!r}'.format('totalExecutorSteps', self._exe.conf.totalExecutorSteps))
250  print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'totalExecutorSteps', self._exe.conf.totalExecutorSteps), file=runargsFile)
251  if self._exe.conf.executorStep >= 0:
252  msg.debug('Adding runarg {0!s}={1!r}'.format('executorStep', self._exe.conf.executorStep))
253  print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'executorStep', self._exe.conf.executorStep), file=runargsFile)
254  executorEventCounts, executorEventSkips = getExecutorStepEventCounts(self._exe)
255  msg.debug('Adding runarg {0!s}={1!r}'.format('executorEventCounts', executorEventCounts))
256  print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'executorEventCounts', executorEventCounts), file=runargsFile)
257  msg.debug('Adding runarg {0!s}={1!r}'.format('executorEventSkips', executorEventSkips))
258  print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'executorEventSkips', executorEventSkips), file=runargsFile)
259 
260  # CA
261  if self._exe._isCAEnabled():
262  print(os.linesep, '# Threading flags', file=runargsFile)
263  # Pass the number of threads and processes
264  nprocs = self._exe._athenaMP
265  threads = self._exe._athenaMT
266  concurrentEvents = self._exe._athenaConcurrentEvents
267  msg.debug('Adding runarg {0!s}={1!r}'.format('nprocs', nprocs))
268  print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'nprocs', nprocs), file=runargsFile)
269  msg.debug('Adding runarg {0!s}={1!r}'.format('threads', threads))
270  print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'threads', threads), file=runargsFile)
271  msg.debug('Adding runarg {0!s}={1!r}'.format('concurrentEvents', concurrentEvents))
272  print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'concurrentEvents', concurrentEvents), file=runargsFile)
273  #ComponentAccumulator based config, import skeleton here:
274  print(os.linesep, '# Import skeleton and execute it', file=runargsFile)
275  print('from {0} import fromRunArgs'.format(self._exe._skeletonCA),file=runargsFile)
276  print('fromRunArgs({0})'.format(self._runArgsName),file=runargsFile)
277 
278  msg.info('Successfully wrote runargs file {0}'.format(self._runArgsFile))
279 
280  except OSError as e:
281  errMsg = 'Got an error when writing JO template {0}: {1}'.format(self._runArgsFile, e)
282  msg.error(errMsg)
283  raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_RUNARGS_ERROR'), errMsg)
284 
285 
286 
287 
288  def ensureJobOptions(self):
289  # Check the runArgs:
290  if self._runArgsFile is None:
291  msg.warning('No runArgs available')
292 
293  if not findFile(os.environ["JOBOPTSEARCHPATH"], self._runArgsFile):
294  msg.warning('Could not find runArgs file %s', self._runArgsFile)
295 
296  # Check the skeleton(s):
297  if self._exe._skeleton:
298  for skeleton in self._exe._skeleton:
299  if not findFile(os.environ["JOBOPTSEARCHPATH"], skeleton):
300  msg.warning('Could not find job options skeleton file %s', skeleton)
301 
302 
303 
307  def getTopOptions(self, input = dict(), output = dict()):
308  # Consistency check
309  self.consistencyCheck()
310  # Update the output name
311  self._runArgsFile = 'runargs.' + self._exe.name + '.py'
312  # First Make the runArgs file:
313  self.writeRunArgs(input = input, output = output)
314  # Make sure runArgs and skeleton are valid
315  self.ensureJobOptions()
316  if self._exe._isCAEnabled():
317  #ComponentAccumulator based config, use only runargs file
318  return [ self._runArgsFile ]
319  else:
320  #Traditional athena: runargs + skeleton
321  return [ self._runArgsFile ] + self._exe._skeleton
322 
vtune_athena.format
format
Definition: vtune_athena.py:14
python.trfJobOptions.JobOptionsTemplate._runArgsName
_runArgsName
Definition: trfJobOptions.py:49
PyJobTransforms.trfArgClasses
Transform argument class definitions.
python.trfJobOptions.JobOptionsTemplate._runArgsFile
_runArgsFile
Definition: trfJobOptions.py:50
PyJobTransforms.trfExitCodes
Module for transform exit codes.
python.trfExceptions.TransformExecutionException
Base class for execution exceptions.
Definition: trfExceptions.py:62
python.trfJobOptions.JobOptionsTemplate._version
_version
Definition: trfJobOptions.py:48
python.trfJobOptions.JobOptionsTemplate.__init__
def __init__(self, exe, version=None, runArgsName='runArgs')
Initialise the job options template class.
Definition: trfJobOptions.py:46
python.trfJobOptions.JobOptionsTemplate.writeRunArgs
def writeRunArgs(self, input=dict(), output=dict())
Write the runArgs Job Options file.
Definition: trfJobOptions.py:66
PyAthena::repr
std::string repr(PyObject *o)
returns the string representation of a python object equivalent of calling repr(o) in python
Definition: PyAthenaUtils.cxx:106
python.trfJobOptions.RunArguments.__str__
def __str__(self)
Definition: trfJobOptions.py:28
beamspotman.dir
string dir
Definition: beamspotman.py:623
python.trfJobOptions.JobOptionsTemplate._exe
_exe
Definition: trfJobOptions.py:47
python.trfJobOptions.JobOptionsTemplate.getTopOptions
def getTopOptions(self, input=dict(), output=dict())
Get the runArgs and skeleton joboptions, Master function.
Definition: trfJobOptions.py:307
python.trfArgClasses.argFile
File argument class.
Definition: trfArgClasses.py:522
python.trfExeStepTools.getExecutorStepEventCounts
def getExecutorStepEventCounts(executor, argdict=None)
Definition: trfExeStepTools.py:44
python.trfUtils.findFile
def findFile(pathvar, fname)
Find a named file along a colon separated PATH type variable.
Definition: trfUtils.py:37
Trk::open
@ open
Definition: BinningType.h:40
PyJobTransforms.trfUtils
Transform utility functions.
python.trfJobOptions.JobOptionsTemplate.ensureJobOptions
def ensureJobOptions(self)
Make sure skeleton file is available.
Definition: trfJobOptions.py:288
python.trfArgClasses.argSubstep
Base class for substep arguments.
Definition: trfArgClasses.py:1926
pickleTool.object
object
Definition: pickleTool.py:30
dbg::print
void print(std::FILE *stream, std::format_string< Args... > fmt, Args &&... args)
Definition: SGImplSvc.cxx:70
python.trfJobOptions.JobOptionsTemplate.consistencyCheck
def consistencyCheck(self)
Check for skeleton consistency.
Definition: trfJobOptions.py:53
python.trfJobOptions.RunArguments
Hold run arguments as name-value pairs.
Definition: trfJobOptions.py:26
python.trfJobOptions.JobOptionsTemplate
Class that generates the job options (AKA runargs) python file for an athena executor.
Definition: trfJobOptions.py:38
python.trfArgClasses.argument
Basic argument class holding a value which can be get and set.
Definition: trfArgClasses.py:102