ATLAS Offline Software
Loading...
Searching...
No Matches
trfJobOptions.py
Go to the documentation of this file.
1# Copyright (C) 2002-2024 CERN for the benefit of the ATLAS collaboration
2
9
10import os
11import time
12
13import logging
14msg = logging.getLogger(__name__)
15
16import PyJobTransforms.trfArgClasses as trfArgClasses
17import PyJobTransforms.trfExceptions as trfExceptions
18from PyJobTransforms.trfExeStepTools import getExecutorStepEventCounts
19from PyJobTransforms.trfExitCodes import trfExit
20
21from PyJobTransforms.trfUtils import findFile
22
23
24
26class RunArguments(object):
27 """Dynamic class that holds the run arguments as named members with values."""
28 def __str__(self):
29 myself = 'RunArguments:'
30 for arg in dir(self):
31 if not arg.startswith('__'):
32 myself += '%s %s = %s' % (os.linesep, arg, repr(getattr(self, arg)))
33 return myself
34
35
36
38class JobOptionsTemplate(object):
39 """For generating runArgs JobOptions file """
40
41
46 def __init__(self, exe, version=None, runArgsName='runArgs'):
47 self._exe = exe
48 self._version = version
49 self._runArgsName = runArgsName
50 self._runArgsFile = 'runargs.' + self._exe.name + '.py'
51
52
54 if self._exe._isCAEnabled():
55 if self._exe._skeletonCA is None:
56 errMsg = "ComponentAccumulator requested but this transform does not supply a ComponentAccumulator-based skeleton file"
57 msg.error(errMsg)
58 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_RUNARGS_ERROR'),errMsg)
59 else: # not self._exe._isCAEnabled():
60 if self._exe._skeleton is None:
61 errMsg = "Legacy configuration requested but this transform does not supply a legacy skeleton file"
62 msg.error(errMsg)
63 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_RUNARGS_ERROR'),errMsg)
64
65
66 def writeRunArgs(self, input = dict(), output = dict()):
67 msg.info('Writing runArgs to file \"%s\"', self._runArgsFile)
68
69 with open(self._runArgsFile, 'w') as runargsFile:
70 try:
71 if self._exe._isCAEnabled():
72 # Write a shebang to identify CA files.
73 print("#!/usr/bin/env athena.py", file=runargsFile)
74 # First write a little header
75 print(os.linesep.join(("# Run arguments file auto-generated on {0} by:".format(time.asctime()),
76 "# JobTransform: {0}".format(self._exe.name),
77 "# Version: {0}".format(self._version)
78 )), file=runargsFile)
79
80 # Now make sure we import the runArgs class for out job options
81 print(os.linesep.join(("# Import runArgs class",
82 "from PyJobTransforms.trfJobOptions import RunArguments",
83 "{0} = RunArguments()".format(self._runArgsName)
84 )), file=runargsFile)
85
86 # Handy to write the substep name here as it can be used as (part of) a random seed
87 # in some cases
88 print('{0}.trfSubstepName = {1!r}'.format(self._runArgsName, self._exe.name), os.linesep, file=runargsFile)
89
90 # Now loop over the core argdict and see what needs to be given as a runArg
91 declaredRunargs = []
92 for k, v in self._exe.conf.argdict.items():
93 # Check if this arg is supposed to be in runArgs
94 if isinstance(v, trfArgClasses.argument) and v.isRunarg:
95 # Files handled later
96 if isinstance(v, trfArgClasses.argFile):
97 continue
98
99 msg.debug('Argument {0} is a runarg, will be added to JO file (value {1})'.format(k, v.value))
100
101
104 if isinstance(v, trfArgClasses.argSubstep):
105 myValue = v.returnMyValue(exe = self._exe)
106 if myValue is not None:
107 print("{0}.{1!s} = {2!r}".format(self._runArgsName, k, myValue), file=runargsFile)
108 msg.debug('Added substep type argument {0} as: {1}'.format(k, myValue))
109 declaredRunargs.append(k)
110 else:
111 print("{0}.{1!s} = {2!r}".format(self._runArgsName, k, v.value), file=runargsFile)
112 declaredRunargs.append(k)
113 else:
114 msg.debug('Argument {0} is not a runarg - ignored'.format(k))
115
116 # Now make sure that if we did not add maxEvents then we set this to -1, which
117 # avoids some strange defaults that only allow 5 events to be processed
118 if 'maxEvents' not in declaredRunargs:
119 print(os.linesep.join(("", "# Explicitly added to process all events in this step",
120 "{0}.maxEvents = -1".format(self._runArgsName),
121 )), file=runargsFile)
122
123 # Now deal with our input and output files
124 print(os.linesep, "# Input data", file=runargsFile)
125 for dataType, dataArg in input.items():
126 if isinstance(dataArg, list) and dataArg:
127 dataArgStep = dataArg[self._exe.conf.executorStep]
128 print('{0}.input{1}File = {2!r}'.format(self._runArgsName, dataType, dataArgStep.value), file=runargsFile)
129 print('{0}.input{1}FileType = {2!r}'.format(self._runArgsName, dataType, dataArgStep.type), file=runargsFile)
130 # Add the input event count, if we know it
131 if dataArgStep.isCached(metadataKeys = ['nentries']):
132 print('{0}.input{1}FileNentries = {2!r}'.format(self._runArgsName, dataType, dataArgStep.nentries), file=runargsFile)
133 print("{0}.{1}FileIO = {2!r}".format(self._runArgsName, dataType, dataArgStep.io), file=runargsFile)
134 else:
135 print('{0}.input{1}File = {2!r}'.format(self._runArgsName, dataType, dataArg.value), file=runargsFile)
136 print('{0}.input{1}FileType = {2!r}'.format(self._runArgsName, dataType, dataArg.type), file=runargsFile)
137 # Add the input event count, if we know it
138 if dataArg.isCached(metadataKeys = ['nentries']):
139 print('{0}.input{1}FileNentries = {2!r}'.format(self._runArgsName, dataType, dataArg.nentries), file=runargsFile)
140 print("{0}.{1}FileIO = {2!r}".format(self._runArgsName, dataType, dataArg.io), file=runargsFile)
141
142 print(os.linesep, "# Output data", file=runargsFile)
143 for dataType, dataArg in output.items():
144 # Need to be careful to convert _output_ filename as a strings, not a list
145 print('{0}.output{1}File = {2!r}'.format(self._runArgsName, dataType, dataArg.value[0]), file=runargsFile)
146 print('{0}.output{1}FileType = {2!r}'.format(self._runArgsName, dataType, dataArg.type), file=runargsFile)
147
148
149 # Process all of the tweaky special runtime arguments
150 print(os.linesep, "# Extra runargs", file=runargsFile)
151
152 for k, v in self._exe._extraRunargs.items():
153
155 if k in declaredRunargs:
156 if isinstance(self._exe.conf.argdict[k].value, list):
157 msg.debug('Extending runarg {0!s}={1!r}'.format(k, v))
158 print('{0}.{1!s}.extend({2!r})'.format(self._runArgsName, k, v), file=runargsFile)
159 else:
160 msg.debug('Adding runarg {0!s}={1!r}'.format(k, v))
161 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, k, v), file=runargsFile)
162
163
164 print(os.linesep, '# Extra runtime runargs', file=runargsFile)
165 for k, v in self._exe._runtimeRunargs.items():
166 # These options are string converted, not repred, so they can write an option
167 # which is evaluated at runtime
168 # Protect this with try: except: for the Embedding use case
169 msg.debug('Adding runarg {0!s}={1!r}'.format(k, v))
170 if self._exe._isCAEnabled():
171 print(os.linesep.join(('try:',
172 ' {0}.{1!s} = {2!s}'.format(self._runArgsName, k, v),
173 'except AttributeError:',
174 ' print ("WARNING - AttributeError for {0}")'.format(k))), file=runargsFile)
175 else:
176 print(os.linesep.join(('try:',
177 ' {0}.{1!s} = {2!s}'.format(self._runArgsName, k, v),
178 'except AttributeError:',
179 ' print ("WARNING - AttributeError for {0}")'.format(k))), file=runargsFile)
180
181 if self._exe._literalRunargs is not None:
182 print(os.linesep, '# Literal runargs snippets', file=runargsFile)
183 for line in self._exe._literalRunargs:
184 print(line, file=runargsFile)
185
186
187 for dataType in self._exe._dataArgs:
188 print(os.linesep, '# Forced data value arguments', file=runargsFile)
189 if dataType in self._exe.conf.dataDictionary:
190 print('{0}.data{1}arg = {2!r}'.format(self._runArgsName, dataType,
191 self._exe.conf.dataDictionary[dataType].value), file=runargsFile)
192 else:
193 print('# Warning: data type "{0}" is not part of this transform'.format(dataType), file=runargsFile)
194
195 # This adds the correct JO fragment for AthenaMP job, where we need to ask
196 # the FileMgr to produce the requested log and report files
197 # Also, aggregating the workers' logfiles into the mother's makes life
198 # easier for debugging
199 if self._exe._athenaMP:
200 print(os.linesep, '# AthenaMP Options. nprocs = %d' % self._exe._athenaMP, file=runargsFile)
201 # Proxy for both options
202 if self._exe._isCAEnabled():
203 # do not edit flags directly in this case
204 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPWorkerTopDir', self._exe._athenaMPWorkerTopDir), file=runargsFile)
205 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPOutputReportFile', self._exe._athenaMPFileReport), file=runargsFile)
206 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPEventOrdersFile', self._exe._athenaMPEventOrdersFile), file=runargsFile)
207 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPCollectSubprocessLogs', True), file=runargsFile)
208 else:
209 print(os.linesep.join((os.linesep,
210 'from AthenaMP.AthenaMPFlags import jobproperties as AthenaMPJobProps',
211 'AthenaMPJobProps.AthenaMPFlags.WorkerTopDir="{0}"'.format(self._exe._athenaMPWorkerTopDir),
212 'AthenaMPJobProps.AthenaMPFlags.OutputReportFile="{0}"'.format(self._exe._athenaMPFileReport),
213 'AthenaMPJobProps.AthenaMPFlags.EventOrdersFile="{0}"'.format(self._exe._athenaMPEventOrdersFile),
214 'AthenaMPJobProps.AthenaMPFlags.CollectSubprocessLogs=True'
215 )), file=runargsFile)
216 if self._exe._athenaMPStrategy:
217 if self._exe._isCAEnabled():
218 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPStrategy', self._exe._athenaMPStrategy), file=runargsFile)
219 else:
220 # Beware of clobbering a non default value (a feature used by EventService)
221 print('if AthenaMPJobProps.AthenaMPFlags.Strategy.isDefault():', file=runargsFile)
222 print('\tAthenaMPJobProps.AthenaMPFlags.Strategy="{0}"'.format(self._exe._athenaMPStrategy), file=runargsFile)
223 if self._exe._athenaMPReadEventOrders:
224 if os.path.isfile(self._exe._athenaMPEventOrdersFile):
225 if self._exe._isCAEnabled():
226 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPReadEventOrders', True), file=runargsFile)
227 else:
228 print('AthenaMPJobProps.AthenaMPFlags.ReadEventOrders=True', file=runargsFile)
229 else:
230 raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_EXEC_RUNARGS_ERROR"), "Failed to find file: {0} required by athenaMP option: --athenaMPUseEventOrders true".format(self._exe._athenaMPEventOrdersFile))
231 if 'athenaMPEventsBeforeFork' in self._exe.conf.argdict:
232 if self._exe._isCAEnabled():
233 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPEventsBeforeFork', self._exe.conf.argdict['athenaMPEventsBeforeFork'].value), file=runargsFile)
234 else:
235 print('AthenaMPJobProps.AthenaMPFlags.EventsBeforeFork={0}'.format(self._exe.conf.argdict['athenaMPEventsBeforeFork'].value), file=runargsFile)
236 if 'sharedWriter' in self._exe.conf.argdict:
237 if self._exe._isCAEnabled():
238 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'sharedWriter', self._exe.conf.argdict['sharedWriter'].value), file=runargsFile)
239 else:
240 print(f"AthenaMPJobProps.AthenaMPFlags.UseSharedWriter={self._exe.conf.argdict['sharedWriter'].value}", file=runargsFile)
241 if 'parallelCompression' in self._exe.conf.argdict:
242 if self._exe._isCAEnabled():
243 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'parallelCompression', self._exe.conf.argdict['parallelCompression'].value), file=runargsFile)
244 else:
245 print(f"AthenaMPJobProps.AthenaMPFlags.UseParallelCompression={self._exe.conf.argdict['parallelCompression'].value}", file=runargsFile)
246
247 # Executor substeps
248 print(os.linesep, '# Executor flags', file=runargsFile)
249 msg.debug('Adding runarg {0!s}={1!r}'.format('totalExecutorSteps', self._exe.conf.totalExecutorSteps))
250 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'totalExecutorSteps', self._exe.conf.totalExecutorSteps), file=runargsFile)
251 if self._exe.conf.executorStep >= 0:
252 msg.debug('Adding runarg {0!s}={1!r}'.format('executorStep', self._exe.conf.executorStep))
253 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'executorStep', self._exe.conf.executorStep), file=runargsFile)
254 executorEventCounts, executorEventSkips = getExecutorStepEventCounts(self._exe)
255 msg.debug('Adding runarg {0!s}={1!r}'.format('executorEventCounts', executorEventCounts))
256 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'executorEventCounts', executorEventCounts), file=runargsFile)
257 msg.debug('Adding runarg {0!s}={1!r}'.format('executorEventSkips', executorEventSkips))
258 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'executorEventSkips', executorEventSkips), file=runargsFile)
259
260 # CA
261 if self._exe._isCAEnabled():
262 print(os.linesep, '# Threading flags', file=runargsFile)
263 # Pass the number of threads and processes
264 nprocs = self._exe._athenaMP
265 threads = self._exe._athenaMT
266 concurrentEvents = self._exe._athenaConcurrentEvents
267 msg.debug('Adding runarg {0!s}={1!r}'.format('nprocs', nprocs))
268 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'nprocs', nprocs), file=runargsFile)
269 msg.debug('Adding runarg {0!s}={1!r}'.format('threads', threads))
270 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'threads', threads), file=runargsFile)
271 msg.debug('Adding runarg {0!s}={1!r}'.format('concurrentEvents', concurrentEvents))
272 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'concurrentEvents', concurrentEvents), file=runargsFile)
273 #ComponentAccumulator based config, import skeleton here:
274 print(os.linesep, '# Import skeleton and execute it', file=runargsFile)
275 print('from {0} import fromRunArgs'.format(self._exe._skeletonCA),file=runargsFile)
276 print('fromRunArgs({0})'.format(self._runArgsName),file=runargsFile)
277
278 msg.info('Successfully wrote runargs file {0}'.format(self._runArgsFile))
279
280 except OSError as e:
281 errMsg = 'Got an error when writing JO template {0}: {1}'.format(self._runArgsFile, e)
282 msg.error(errMsg)
283 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_RUNARGS_ERROR'), errMsg)
284
285
286
287
289 # Check the runArgs:
290 if self._runArgsFile is None:
291 msg.warning('No runArgs available')
292
293 if not findFile(os.environ["JOBOPTSEARCHPATH"], self._runArgsFile):
294 msg.warning('Could not find runArgs file %s', self._runArgsFile)
295
296 # Check the skeleton(s):
297 if self._exe._skeleton:
298 for skeleton in self._exe._skeleton:
299 if not findFile(os.environ["JOBOPTSEARCHPATH"], skeleton):
300 msg.warning('Could not find job options skeleton file %s', skeleton)
301
302
303
307 def getTopOptions(self, input = dict(), output = dict()):
308 # Consistency check
309 self.consistencyCheck()
310 # Update the output name
311 self._runArgsFile = 'runargs.' + self._exe.name + '.py'
312 # First Make the runArgs file:
313 self.writeRunArgs(input = input, output = output)
314 # Make sure runArgs and skeleton are valid
315 self.ensureJobOptions()
316 if self._exe._isCAEnabled():
317 #ComponentAccumulator based config, use only runargs file
318 return [ self._runArgsFile ]
319 else:
320 #Traditional athena: runargs + skeleton
321 return [ self._runArgsFile ] + self._exe._skeleton
322
void print(char *figname, TCanvas *c1)
Base class for substep arguments.
Basic argument class holding a value which can be get and set.
Base class for execution exceptions.
Class that generates the job options (AKA runargs) python file for an athena executor.
writeRunArgs(self, input=dict(), output=dict())
Write the runArgs Job Options file.
consistencyCheck(self)
Check for skeleton consistency.
ensureJobOptions(self)
Make sure skeleton file is available.
getTopOptions(self, input=dict(), output=dict())
Get the runArgs and skeleton joboptions, Master function.
__init__(self, exe, version=None, runArgsName='runArgs')
Initialise the job options template class.
Hold run arguments as name-value pairs.
Transform argument class definitions.
Module for transform exit codes.
Transform utility functions.