ATLAS Offline Software
Loading...
Searching...
No Matches
python.trfJobOptions.JobOptionsTemplate Class Reference

Class that generates the job options (AKA runargs) python file for an athena executor. More...

Inheritance diagram for python.trfJobOptions.JobOptionsTemplate:
Collaboration diagram for python.trfJobOptions.JobOptionsTemplate:

Public Types

typedef HLT::TypeInformation::for_each_type_c< typenameEDMLIST::map, my_functor, my_result<>, my_arg< HLT::TypeInformation::get_cont, CONTAINER > >::type result

Public Member Functions

 __init__ (self, exe, version=None, runArgsName='runArgs')
 Initialise the job options template class.
 consistencyCheck (self)
 Check for skeleton consistency.
 writeRunArgs (self, input=dict(), output=dict())
 Write the runArgs Job Options file.
 ensureJobOptions (self)
 Make sure skeleton file is available.
 getTopOptions (self, input=dict(), output=dict())
 Get the runArgs and skeleton joboptions, Master function.

Protected Attributes

 _exe = exe
 _version = version
 _runArgsName = runArgsName
str _runArgsFile = 'runargs.' + self._exe.name + '.py'

Detailed Description

Class that generates the job options (AKA runargs) python file for an athena executor.

For generating runArgs JobOptions file 

Definition at line 38 of file trfJobOptions.py.

Member Typedef Documentation

◆ result

Definition at line 90 of file EDM_MasterSearch.h.

Constructor & Destructor Documentation

◆ __init__()

python.trfJobOptions.JobOptionsTemplate.__init__ ( self,
exe,
version = None,
runArgsName = 'runArgs' )

Initialise the job options template class.

Parameters
exeAssociated athena executor
versionOptional version string
runArgsNameName of runtime argument class
Note
Almost all useful parameters for this class are part of the executor itself

Definition at line 46 of file trfJobOptions.py.

46 def __init__(self, exe, version=None, runArgsName='runArgs'):
47 self._exe = exe
48 self._version = version
49 self._runArgsName = runArgsName
50 self._runArgsFile = 'runargs.' + self._exe.name + '.py'
51

Member Function Documentation

◆ consistencyCheck()

python.trfJobOptions.JobOptionsTemplate.consistencyCheck ( self)

Check for skeleton consistency.

Definition at line 53 of file trfJobOptions.py.

53 def consistencyCheck(self):
54 if self._exe._isCAEnabled():
55 if self._exe._skeletonCA is None:
56 errMsg = "ComponentAccumulator requested but this transform does not supply a ComponentAccumulator-based skeleton file"
57 msg.error(errMsg)
58 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_RUNARGS_ERROR'),errMsg)
59 else: # not self._exe._isCAEnabled():
60 if self._exe._skeleton is None:
61 errMsg = "Legacy configuration requested but this transform does not supply a legacy skeleton file"
62 msg.error(errMsg)
63 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_RUNARGS_ERROR'),errMsg)
64

◆ ensureJobOptions()

python.trfJobOptions.JobOptionsTemplate.ensureJobOptions ( self)

Make sure skeleton file is available.

Definition at line 288 of file trfJobOptions.py.

288 def ensureJobOptions(self):
289 # Check the runArgs:
290 if self._runArgsFile is None:
291 msg.warning('No runArgs available')
292
293 if not findFile(os.environ["JOBOPTSEARCHPATH"], self._runArgsFile):
294 msg.warning('Could not find runArgs file %s', self._runArgsFile)
295
296 # Check the skeleton(s):
297 if self._exe._skeleton:
298 for skeleton in self._exe._skeleton:
299 if not findFile(os.environ["JOBOPTSEARCHPATH"], skeleton):
300 msg.warning('Could not find job options skeleton file %s', skeleton)
301
302

◆ getTopOptions()

python.trfJobOptions.JobOptionsTemplate.getTopOptions ( self,
input = dict(),
output = dict() )

Get the runArgs and skeleton joboptions, Master function.

Parameters
inputInput file list
outputOutput file list
Returns
List of runargs and skeletons to be processed by athena

Definition at line 307 of file trfJobOptions.py.

307 def getTopOptions(self, input = dict(), output = dict()):
308 # Consistency check
309 self.consistencyCheck()
310 # Update the output name
311 self._runArgsFile = 'runargs.' + self._exe.name + '.py'
312 # First Make the runArgs file:
313 self.writeRunArgs(input = input, output = output)
314 # Make sure runArgs and skeleton are valid
315 self.ensureJobOptions()
316 if self._exe._isCAEnabled():
317 #ComponentAccumulator based config, use only runargs file
318 return [ self._runArgsFile ]
319 else:
320 #Traditional athena: runargs + skeleton
321 return [ self._runArgsFile ] + self._exe._skeleton
322

◆ writeRunArgs()

python.trfJobOptions.JobOptionsTemplate.writeRunArgs ( self,
input = dict(),
output = dict() )

Write the runArgs Job Options file.

Definition at line 66 of file trfJobOptions.py.

66 def writeRunArgs(self, input = dict(), output = dict()):
67 msg.info('Writing runArgs to file \"%s\"', self._runArgsFile)
68
69 with open(self._runArgsFile, 'w') as runargsFile:
70 try:
71 if self._exe._isCAEnabled():
72 # Write a shebang to identify CA files.
73 print("#!/usr/bin/env athena.py", file=runargsFile)
74 # First write a little header
75 print(os.linesep.join(("# Run arguments file auto-generated on {0} by:".format(time.asctime()),
76 "# JobTransform: {0}".format(self._exe.name),
77 "# Version: {0}".format(self._version)
78 )), file=runargsFile)
79
80 # Now make sure we import the runArgs class for out job options
81 print(os.linesep.join(("# Import runArgs class",
82 "from PyJobTransforms.trfJobOptions import RunArguments",
83 "{0} = RunArguments()".format(self._runArgsName)
84 )), file=runargsFile)
85
86 # Handy to write the substep name here as it can be used as (part of) a random seed
87 # in some cases
88 print('{0}.trfSubstepName = {1!r}'.format(self._runArgsName, self._exe.name), os.linesep, file=runargsFile)
89
90 # Now loop over the core argdict and see what needs to be given as a runArg
91 declaredRunargs = []
92 for k, v in self._exe.conf.argdict.items():
93 # Check if this arg is supposed to be in runArgs
94 if isinstance(v, trfArgClasses.argument) and v.isRunarg:
95 # Files handled later
96 if isinstance(v, trfArgClasses.argFile):
97 continue
98
99 msg.debug('Argument {0} is a runarg, will be added to JO file (value {1})'.format(k, v.value))
100
101
104 if isinstance(v, trfArgClasses.argSubstep):
105 myValue = v.returnMyValue(exe = self._exe)
106 if myValue is not None:
107 print("{0}.{1!s} = {2!r}".format(self._runArgsName, k, myValue), file=runargsFile)
108 msg.debug('Added substep type argument {0} as: {1}'.format(k, myValue))
109 declaredRunargs.append(k)
110 else:
111 print("{0}.{1!s} = {2!r}".format(self._runArgsName, k, v.value), file=runargsFile)
112 declaredRunargs.append(k)
113 else:
114 msg.debug('Argument {0} is not a runarg - ignored'.format(k))
115
116 # Now make sure that if we did not add maxEvents then we set this to -1, which
117 # avoids some strange defaults that only allow 5 events to be processed
118 if 'maxEvents' not in declaredRunargs:
119 print(os.linesep.join(("", "# Explicitly added to process all events in this step",
120 "{0}.maxEvents = -1".format(self._runArgsName),
121 )), file=runargsFile)
122
123 # Now deal with our input and output files
124 print(os.linesep, "# Input data", file=runargsFile)
125 for dataType, dataArg in input.items():
126 if isinstance(dataArg, list) and dataArg:
127 dataArgStep = dataArg[self._exe.conf.executorStep]
128 print('{0}.input{1}File = {2!r}'.format(self._runArgsName, dataType, dataArgStep.value), file=runargsFile)
129 print('{0}.input{1}FileType = {2!r}'.format(self._runArgsName, dataType, dataArgStep.type), file=runargsFile)
130 # Add the input event count, if we know it
131 if dataArgStep.isCached(metadataKeys = ['nentries']):
132 print('{0}.input{1}FileNentries = {2!r}'.format(self._runArgsName, dataType, dataArgStep.nentries), file=runargsFile)
133 print("{0}.{1}FileIO = {2!r}".format(self._runArgsName, dataType, dataArgStep.io), file=runargsFile)
134 else:
135 print('{0}.input{1}File = {2!r}'.format(self._runArgsName, dataType, dataArg.value), file=runargsFile)
136 print('{0}.input{1}FileType = {2!r}'.format(self._runArgsName, dataType, dataArg.type), file=runargsFile)
137 # Add the input event count, if we know it
138 if dataArg.isCached(metadataKeys = ['nentries']):
139 print('{0}.input{1}FileNentries = {2!r}'.format(self._runArgsName, dataType, dataArg.nentries), file=runargsFile)
140 print("{0}.{1}FileIO = {2!r}".format(self._runArgsName, dataType, dataArg.io), file=runargsFile)
141
142 print(os.linesep, "# Output data", file=runargsFile)
143 for dataType, dataArg in output.items():
144 # Need to be careful to convert _output_ filename as a strings, not a list
145 print('{0}.output{1}File = {2!r}'.format(self._runArgsName, dataType, dataArg.value[0]), file=runargsFile)
146 print('{0}.output{1}FileType = {2!r}'.format(self._runArgsName, dataType, dataArg.type), file=runargsFile)
147
148
149 # Process all of the tweaky special runtime arguments
150 print(os.linesep, "# Extra runargs", file=runargsFile)
151
152 for k, v in self._exe._extraRunargs.items():
153
155 if k in declaredRunargs:
156 if isinstance(self._exe.conf.argdict[k].value, list):
157 msg.debug('Extending runarg {0!s}={1!r}'.format(k, v))
158 print('{0}.{1!s}.extend({2!r})'.format(self._runArgsName, k, v), file=runargsFile)
159 else:
160 msg.debug('Adding runarg {0!s}={1!r}'.format(k, v))
161 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, k, v), file=runargsFile)
162
163
164 print(os.linesep, '# Extra runtime runargs', file=runargsFile)
165 for k, v in self._exe._runtimeRunargs.items():
166 # These options are string converted, not repred, so they can write an option
167 # which is evaluated at runtime
168 # Protect this with try: except: for the Embedding use case
169 msg.debug('Adding runarg {0!s}={1!r}'.format(k, v))
170 if self._exe._isCAEnabled():
171 print(os.linesep.join(('try:',
172 ' {0}.{1!s} = {2!s}'.format(self._runArgsName, k, v),
173 'except AttributeError:',
174 ' print ("WARNING - AttributeError for {0}")'.format(k))), file=runargsFile)
175 else:
176 print(os.linesep.join(('try:',
177 ' {0}.{1!s} = {2!s}'.format(self._runArgsName, k, v),
178 'except AttributeError:',
179 ' print ("WARNING - AttributeError for {0}")'.format(k))), file=runargsFile)
180
181 if self._exe._literalRunargs is not None:
182 print(os.linesep, '# Literal runargs snippets', file=runargsFile)
183 for line in self._exe._literalRunargs:
184 print(line, file=runargsFile)
185
186
187 for dataType in self._exe._dataArgs:
188 print(os.linesep, '# Forced data value arguments', file=runargsFile)
189 if dataType in self._exe.conf.dataDictionary:
190 print('{0}.data{1}arg = {2!r}'.format(self._runArgsName, dataType,
191 self._exe.conf.dataDictionary[dataType].value), file=runargsFile)
192 else:
193 print('# Warning: data type "{0}" is not part of this transform'.format(dataType), file=runargsFile)
194
195 # This adds the correct JO fragment for AthenaMP job, where we need to ask
196 # the FileMgr to produce the requested log and report files
197 # Also, aggregating the workers' logfiles into the mother's makes life
198 # easier for debugging
199 if self._exe._athenaMP:
200 print(os.linesep, '# AthenaMP Options. nprocs = %d' % self._exe._athenaMP, file=runargsFile)
201 # Proxy for both options
202 if self._exe._isCAEnabled():
203 # do not edit flags directly in this case
204 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPWorkerTopDir', self._exe._athenaMPWorkerTopDir), file=runargsFile)
205 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPOutputReportFile', self._exe._athenaMPFileReport), file=runargsFile)
206 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPEventOrdersFile', self._exe._athenaMPEventOrdersFile), file=runargsFile)
207 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPCollectSubprocessLogs', True), file=runargsFile)
208 else:
209 print(os.linesep.join((os.linesep,
210 'from AthenaMP.AthenaMPFlags import jobproperties as AthenaMPJobProps',
211 'AthenaMPJobProps.AthenaMPFlags.WorkerTopDir="{0}"'.format(self._exe._athenaMPWorkerTopDir),
212 'AthenaMPJobProps.AthenaMPFlags.OutputReportFile="{0}"'.format(self._exe._athenaMPFileReport),
213 'AthenaMPJobProps.AthenaMPFlags.EventOrdersFile="{0}"'.format(self._exe._athenaMPEventOrdersFile),
214 'AthenaMPJobProps.AthenaMPFlags.CollectSubprocessLogs=True'
215 )), file=runargsFile)
216 if self._exe._athenaMPStrategy:
217 if self._exe._isCAEnabled():
218 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPStrategy', self._exe._athenaMPStrategy), file=runargsFile)
219 else:
220 # Beware of clobbering a non default value (a feature used by EventService)
221 print('if AthenaMPJobProps.AthenaMPFlags.Strategy.isDefault():', file=runargsFile)
222 print('\tAthenaMPJobProps.AthenaMPFlags.Strategy="{0}"'.format(self._exe._athenaMPStrategy), file=runargsFile)
223 if self._exe._athenaMPReadEventOrders:
224 if os.path.isfile(self._exe._athenaMPEventOrdersFile):
225 if self._exe._isCAEnabled():
226 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPReadEventOrders', True), file=runargsFile)
227 else:
228 print('AthenaMPJobProps.AthenaMPFlags.ReadEventOrders=True', file=runargsFile)
229 else:
230 raise trfExceptions.TransformExecutionException(trfExit.nameToCode("TRF_EXEC_RUNARGS_ERROR"), "Failed to find file: {0} required by athenaMP option: --athenaMPUseEventOrders true".format(self._exe._athenaMPEventOrdersFile))
231 if 'athenaMPEventsBeforeFork' in self._exe.conf.argdict:
232 if self._exe._isCAEnabled():
233 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'athenaMPEventsBeforeFork', self._exe.conf.argdict['athenaMPEventsBeforeFork'].value), file=runargsFile)
234 else:
235 print('AthenaMPJobProps.AthenaMPFlags.EventsBeforeFork={0}'.format(self._exe.conf.argdict['athenaMPEventsBeforeFork'].value), file=runargsFile)
236 if 'sharedWriter' in self._exe.conf.argdict:
237 if self._exe._isCAEnabled():
238 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'sharedWriter', self._exe.conf.argdict['sharedWriter'].value), file=runargsFile)
239 else:
240 print(f"AthenaMPJobProps.AthenaMPFlags.UseSharedWriter={self._exe.conf.argdict['sharedWriter'].value}", file=runargsFile)
241 if 'parallelCompression' in self._exe.conf.argdict:
242 if self._exe._isCAEnabled():
243 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'parallelCompression', self._exe.conf.argdict['parallelCompression'].value), file=runargsFile)
244 else:
245 print(f"AthenaMPJobProps.AthenaMPFlags.UseParallelCompression={self._exe.conf.argdict['parallelCompression'].value}", file=runargsFile)
246
247 # Executor substeps
248 print(os.linesep, '# Executor flags', file=runargsFile)
249 msg.debug('Adding runarg {0!s}={1!r}'.format('totalExecutorSteps', self._exe.conf.totalExecutorSteps))
250 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'totalExecutorSteps', self._exe.conf.totalExecutorSteps), file=runargsFile)
251 if self._exe.conf.executorStep >= 0:
252 msg.debug('Adding runarg {0!s}={1!r}'.format('executorStep', self._exe.conf.executorStep))
253 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'executorStep', self._exe.conf.executorStep), file=runargsFile)
254 executorEventCounts, executorEventSkips = getExecutorStepEventCounts(self._exe)
255 msg.debug('Adding runarg {0!s}={1!r}'.format('executorEventCounts', executorEventCounts))
256 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'executorEventCounts', executorEventCounts), file=runargsFile)
257 msg.debug('Adding runarg {0!s}={1!r}'.format('executorEventSkips', executorEventSkips))
258 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'executorEventSkips', executorEventSkips), file=runargsFile)
259
260 # CA
261 if self._exe._isCAEnabled():
262 print(os.linesep, '# Threading flags', file=runargsFile)
263 # Pass the number of threads and processes
264 nprocs = self._exe._athenaMP
265 threads = self._exe._athenaMT
266 concurrentEvents = self._exe._athenaConcurrentEvents
267 msg.debug('Adding runarg {0!s}={1!r}'.format('nprocs', nprocs))
268 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'nprocs', nprocs), file=runargsFile)
269 msg.debug('Adding runarg {0!s}={1!r}'.format('threads', threads))
270 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'threads', threads), file=runargsFile)
271 msg.debug('Adding runarg {0!s}={1!r}'.format('concurrentEvents', concurrentEvents))
272 print('{0}.{1!s} = {2!r}'.format(self._runArgsName, 'concurrentEvents', concurrentEvents), file=runargsFile)
273 #ComponentAccumulator based config, import skeleton here:
274 print(os.linesep, '# Import skeleton and execute it', file=runargsFile)
275 print('from {0} import fromRunArgs'.format(self._exe._skeletonCA),file=runargsFile)
276 print('fromRunArgs({0})'.format(self._runArgsName),file=runargsFile)
277
278 msg.info('Successfully wrote runargs file {0}'.format(self._runArgsFile))
279
280 except OSError as e:
281 errMsg = 'Got an error when writing JO template {0}: {1}'.format(self._runArgsFile, e)
282 msg.error(errMsg)
283 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_RUNARGS_ERROR'), errMsg)
284
285
286
void print(char *figname, TCanvas *c1)

Member Data Documentation

◆ _exe

python.trfJobOptions.JobOptionsTemplate._exe = exe
protected

Definition at line 47 of file trfJobOptions.py.

◆ _runArgsFile

python.trfJobOptions.JobOptionsTemplate._runArgsFile = 'runargs.' + self._exe.name + '.py'
protected

Definition at line 50 of file trfJobOptions.py.

◆ _runArgsName

python.trfJobOptions.JobOptionsTemplate._runArgsName = runArgsName
protected
Note
extraRunargs are passed using repr, i.e., they should be constants
: What to do if this is a CLI argument as well, in particular for arguments like preExec we want to add to the list, not replace it

Definition at line 49 of file trfJobOptions.py.

◆ _version

python.trfJobOptions.JobOptionsTemplate._version = version
protected

Definition at line 48 of file trfJobOptions.py.


The documentation for this class was generated from the following file: