14 msg = logging.getLogger(__name__)
17 import PyJobTransforms.trfExceptions
as trfExceptions
18 from PyJobTransforms.trfExeStepTools
import getExecutorStepEventCounts
27 """Dynamic class that holds the run arguments as named members with values."""
29 myself =
'RunArguments:'
31 if not arg.startswith(
'__'):
32 myself +=
'%s %s = %s' % (os.linesep, arg,
repr(getattr(self, arg)))
39 """For generating runArgs JobOptions file """
46 def __init__(self, exe, version=None, runArgsName='runArgs'):
54 if self.
_exe._isCAEnabled():
55 if self.
_exe._skeletonCA
is None:
56 errMsg =
"ComponentAccumulator requested but this transform does not supply a ComponentAccumulator-based skeleton file"
60 if self.
_exe._skeleton
is None:
61 errMsg =
"Legacy configuration requested but this transform does not supply a legacy skeleton file"
67 msg.info(
'Writing runArgs to file \"%s\"', self.
_runArgsFile)
71 if self.
_exe._isCAEnabled():
73 print(
"#!/usr/bin/env athena.py", file=runargsFile)
75 print(os.linesep.join((
"# Run arguments file auto-generated on {0} by:".
format(time.asctime()),
81 print(os.linesep.join((
"# Import runArgs class",
82 "from PyJobTransforms.trfJobOptions import RunArguments",
92 for k, v
in self.
_exe.conf.argdict.items():
99 msg.debug(
'Argument {0} is a runarg, will be added to JO file (value {1})'.
format(k, v.value))
105 myValue = v.returnMyValue(exe = self.
_exe)
106 if myValue
is not None:
108 msg.debug(
'Added substep type argument {0} as: {1}'.
format(k, myValue))
109 declaredRunargs.append(k)
112 declaredRunargs.append(k)
114 msg.debug(
'Argument {0} is not a runarg - ignored'.
format(k))
118 if 'maxEvents' not in declaredRunargs:
119 print(os.linesep.join((
"",
"# Explicitly added to process all events in this step",
121 )), file=runargsFile)
124 print(os.linesep,
"# Input data", file=runargsFile)
125 for dataType, dataArg
in input.items():
126 if isinstance(dataArg, list)
and dataArg:
127 dataArgStep = dataArg[self.
_exe.conf.executorStep]
131 if dataArgStep.isCached(metadataKeys = [
'nentries']):
132 print(
'{0}.input{1}FileNentries = {2!r}'.
format(self.
_runArgsName, dataType, dataArgStep.nentries), file=runargsFile)
138 if dataArg.isCached(metadataKeys = [
'nentries']):
139 print(
'{0}.input{1}FileNentries = {2!r}'.
format(self.
_runArgsName, dataType, dataArg.nentries), file=runargsFile)
142 print(os.linesep,
"# Output data", file=runargsFile)
143 for dataType, dataArg
in output.items():
150 print(os.linesep,
"# Extra runargs", file=runargsFile)
152 for k, v
in self.
_exe._extraRunargs.items():
155 if k
in declaredRunargs:
156 if isinstance(self.
_exe.conf.argdict[k].value, list):
157 msg.debug(
'Extending runarg {0!s}={1!r}'.
format(k, v))
160 msg.debug(
'Adding runarg {0!s}={1!r}'.
format(k, v))
164 print(os.linesep,
'# Extra runtime runargs', file=runargsFile)
165 for k, v
in self.
_exe._runtimeRunargs.items():
169 msg.debug(
'Adding runarg {0!s}={1!r}'.
format(k, v))
170 if self.
_exe._isCAEnabled():
171 print(os.linesep.join((
'try:',
173 'except AttributeError:',
174 ' print ("WARNING - AttributeError for {0}")'.
format(k))), file=runargsFile)
176 print(os.linesep.join((
'try:',
178 'except AttributeError:',
179 ' print ("WARNING - AttributeError for {0}")'.
format(k))), file=runargsFile)
181 if self.
_exe._literalRunargs
is not None:
182 print(os.linesep,
'# Literal runargs snippets', file=runargsFile)
183 for line
in self.
_exe._literalRunargs:
184 print(line, file=runargsFile)
187 for dataType
in self.
_exe._dataArgs:
188 print(os.linesep,
'# Forced data value arguments', file=runargsFile)
189 if dataType
in self.
_exe.conf.dataDictionary:
191 self.
_exe.conf.dataDictionary[dataType].value), file=runargsFile)
193 print(
'# Warning: data type "{0}" is not part of this transform'.
format(dataType), file=runargsFile)
199 if self.
_exe._athenaMP:
200 print(os.linesep,
'# AthenaMP Options. nprocs = %d' % self.
_exe._athenaMP, file=runargsFile)
202 if self.
_exe._isCAEnabled():
206 print(
'{0}.{1!s} = {2!r}'.
format(self.
_runArgsName,
'athenaMPEventOrdersFile', self.
_exe._athenaMPEventOrdersFile), file=runargsFile)
209 print(os.linesep.join((os.linesep,
210 'from AthenaMP.AthenaMPFlags import jobproperties as AthenaMPJobProps',
211 'AthenaMPJobProps.AthenaMPFlags.WorkerTopDir="{0}"'.
format(self.
_exe._athenaMPWorkerTopDir),
212 'AthenaMPJobProps.AthenaMPFlags.OutputReportFile="{0}"'.
format(self.
_exe._athenaMPFileReport),
213 'AthenaMPJobProps.AthenaMPFlags.EventOrdersFile="{0}"'.
format(self.
_exe._athenaMPEventOrdersFile),
214 'AthenaMPJobProps.AthenaMPFlags.CollectSubprocessLogs=True'
215 )), file=runargsFile)
216 if self.
_exe._athenaMPStrategy:
217 if self.
_exe._isCAEnabled():
221 print(
'if AthenaMPJobProps.AthenaMPFlags.Strategy.isDefault():', file=runargsFile)
222 print(
'\tAthenaMPJobProps.AthenaMPFlags.Strategy="{0}"'.
format(self.
_exe._athenaMPStrategy), file=runargsFile)
223 if self.
_exe._athenaMPReadEventOrders:
224 if os.path.isfile(self.
_exe._athenaMPEventOrdersFile):
225 if self.
_exe._isCAEnabled():
228 print(
'AthenaMPJobProps.AthenaMPFlags.ReadEventOrders=True', file=runargsFile)
231 if 'athenaMPEventsBeforeFork' in self.
_exe.conf.argdict:
232 if self.
_exe._isCAEnabled():
233 print(
'{0}.{1!s} = {2!r}'.
format(self.
_runArgsName,
'athenaMPEventsBeforeFork', self.
_exe.conf.argdict[
'athenaMPEventsBeforeFork'].value), file=runargsFile)
235 print(
'AthenaMPJobProps.AthenaMPFlags.EventsBeforeFork={0}'.
format(self.
_exe.conf.argdict[
'athenaMPEventsBeforeFork'].value), file=runargsFile)
236 if 'sharedWriter' in self.
_exe.conf.argdict:
237 if self.
_exe._isCAEnabled():
238 print(
'{0}.{1!s} = {2!r}'.
format(self.
_runArgsName,
'sharedWriter', self.
_exe.conf.argdict[
'sharedWriter'].value), file=runargsFile)
240 print(f
"AthenaMPJobProps.AthenaMPFlags.UseSharedWriter={self._exe.conf.argdict['sharedWriter'].value}", file=runargsFile)
241 if 'parallelCompression' in self.
_exe.conf.argdict:
242 if self.
_exe._isCAEnabled():
243 print(
'{0}.{1!s} = {2!r}'.
format(self.
_runArgsName,
'parallelCompression', self.
_exe.conf.argdict[
'parallelCompression'].value), file=runargsFile)
245 print(f
"AthenaMPJobProps.AthenaMPFlags.UseParallelCompression={self._exe.conf.argdict['parallelCompression'].value}", file=runargsFile)
248 print(os.linesep,
'# Executor flags', file=runargsFile)
249 msg.debug(
'Adding runarg {0!s}={1!r}'.
format(
'totalExecutorSteps', self.
_exe.conf.totalExecutorSteps))
251 if self.
_exe.conf.executorStep >= 0:
252 msg.debug(
'Adding runarg {0!s}={1!r}'.
format(
'executorStep', self.
_exe.conf.executorStep))
255 msg.debug(
'Adding runarg {0!s}={1!r}'.
format(
'executorEventCounts', executorEventCounts))
256 print(
'{0}.{1!s} = {2!r}'.
format(self.
_runArgsName,
'executorEventCounts', executorEventCounts), file=runargsFile)
257 msg.debug(
'Adding runarg {0!s}={1!r}'.
format(
'executorEventSkips', executorEventSkips))
258 print(
'{0}.{1!s} = {2!r}'.
format(self.
_runArgsName,
'executorEventSkips', executorEventSkips), file=runargsFile)
261 if self.
_exe._isCAEnabled():
262 print(os.linesep,
'# Threading flags', file=runargsFile)
264 nprocs = self.
_exe._athenaMP
265 threads = self.
_exe._athenaMT
266 concurrentEvents = self.
_exe._athenaConcurrentEvents
267 msg.debug(
'Adding runarg {0!s}={1!r}'.
format(
'nprocs', nprocs))
269 msg.debug(
'Adding runarg {0!s}={1!r}'.
format(
'threads', threads))
271 msg.debug(
'Adding runarg {0!s}={1!r}'.
format(
'concurrentEvents', concurrentEvents))
274 print(os.linesep,
'# Import skeleton and execute it', file=runargsFile)
275 print(
'from {0} import fromRunArgs'.
format(self.
_exe._skeletonCA),file=runargsFile)
281 errMsg =
'Got an error when writing JO template {0}: {1}'.
format(self.
_runArgsFile, e)
291 msg.warning(
'No runArgs available')
294 msg.warning(
'Could not find runArgs file %s', self.
_runArgsFile)
297 if self.
_exe._skeleton:
298 for skeleton
in self.
_exe._skeleton:
299 if not findFile(os.environ[
"JOBOPTSEARCHPATH"], skeleton):
300 msg.warning(
'Could not find job options skeleton file %s', skeleton)
316 if self.
_exe._isCAEnabled():