36 def preExecute(self, input = set(), output =
set()):
37 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.name, input, output))
40 if (self._inputEventTest
and 'skipEvents' in self.conf.argdict
and
41 self.conf.argdict[
'skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
is not None):
42 msg.debug(
'Will test for events to process')
43 for dataType
in input:
44 inputEvents = self.conf.dataDictionary[dataType].nentries
45 msg.debug(
'Got {} events for {}'.
format(inputEvents, dataType))
46 if not isinstance(inputEvents, int):
47 msg.warning(
'Are input events countable? Got nevents={} so disabling event count check for this input'.
format(inputEvents))
48 elif self.conf.argdict[
'skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) >= inputEvents:
49 raise trfExceptions.TransformExecutionException(trfExit.nameToCode(
'TRF_NOEVENTS'),
50 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.
format(self.conf.argdict[
'skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor), inputEvents, dataType))
53 if self._skeleton
is not None:
55 for dataType
in input:
56 inputFiles[dataType] = self.conf.dataDictionary[dataType]
58 for dataType
in output:
59 outputFiles[dataType] = self.conf.dataDictionary[dataType]
62 for dataType, dataArg
in self.conf.dataDictionary.items():
63 if dataArg.io ==
'input' and self._name
in dataArg.executor:
64 inputFiles[dataArg.subtype] = dataArg
66 msg.info(
'Input Files: {0}; Output Files: {1}'.
format(inputFiles, outputFiles))
69 self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
75 self._extraMetadata[
'inputs'] =
list(input)
77 self._extraMetadata[
'outputs'] =
list(output)
81 if 'asetup' in self.conf.argdict:
82 asetupString = self.conf.argdict[
'asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
88 if asetupString
is not None:
90 currentOS = os.environ[
'ALRB_USER_PLATFORM']
91 if legacyOSRelease
and "centos7" not in currentOS:
92 OSSetupString =
"centos7"
93 msg.info(
'Legacy release required for the substep {}, will setup a container running {}'.
format(self._substep, OSSetupString))
96 if 'runInContainer' in self.conf.argdict:
97 OSSetupString = self.conf.argdict[
'runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
98 msg.info(
'The step {} will be performed in a container running {}, as explicitly requested'.
format(self._substep, OSSetupString))
101 dbrelease = dbsetup =
None
102 if 'DBRelease' in self.conf.argdict:
103 dbrelease = self.conf.argdict[
'DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
106 dbdMatch = re.match(
r'DBRelease-([\d\.]+)\.tar\.gz', os.path.basename(dbrelease))
108 msg.debug(
'DBRelease setting {0} matches classic tarball file'.
format(dbrelease))
109 if not os.access(dbrelease, os.R_OK):
110 msg.warning(
'Transform was given tarball DBRelease file {0}, but this is not there'.
format(dbrelease))
111 msg.warning(
'I will now try to find DBRelease {0} in cvmfs'.
format(dbdMatch.group(1)))
112 dbrelease = dbdMatch.group(1)
116 unpacked, dbsetup =
unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
125 self._envUpdate = trfEnv.environmentUpdate()
128 self._prepAthenaCommandLine()
131 if 'athenaHLT' in self._exe:
132 self._cmd.
remove(
'runargs.BSRDOtoRAW.py')
134 optionList =
getTranslated(self.conf.argdict, name=self._name, substep=self._substep, first=self.conf.firstExecutor, output = outputFiles)
135 self._cmd.
extend(optionList)
137 if self._isCAEnabled():
138 msg.info(
"Running in CA mode")
140 self._cmd.
append(self._skeletonCA)
141 if 'preExec' in self.conf.argdict:
142 self._cmd.
extend(self.conf.argdict[
'preExec'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor))
143 msg.info(
'Command adjusted for CA to %s', self._cmd)
145 msg.info(
"Running in legacy mode")
148 if "outputHIST_DEBUGSTREAMMONFile" in self.conf.argdict:
150 dbgAsetupString, dbAlias = dbgStream.dbgPreRun(self.conf.dataDictionary[
'BS_RDO'], self.conf.dataDictionary[
'HIST_DEBUGSTREAMMON'].value, self.conf.argdict)
153 if asetupString
is None and dbgAsetupString
is not None:
154 asetupString = dbgAsetupString
155 msg.info(
'Will use asetup string for debug stream analysis %s', dbgAsetupString)
158 currentOS = os.environ[
'ALRB_USER_PLATFORM']
159 if legacyOSRelease
and "centos7" not in currentOS:
160 OSSetupString =
"centos7"
161 msg.info(
'Legacy release required for the substep {}, will setup a container running {}'.
format(self._substep, OSSetupString))
163 if 'runInContainer' in self.conf.argdict:
164 OSSetupString = self.conf.argdict[
'runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
165 msg.info(
'The step {} will be performed in a container running {}, as explicitly requested'.
format(self._substep, OSSetupString))
168 if 'useDB' in self.conf.argdict
and 'DBserver' not in self.conf.argdict
and dbAlias:
169 msg.warn(
"Database alias will be set to %s", dbAlias)
170 self._cmd.
append(
"--db-server " + dbAlias)
172 msg.info(
"Flag outputHIST_DEBUGSTREAMMONFile not defined - debug stream analysis will not run.")
176 if 'BS' in self.conf.dataDictionary
or 'DRAW_TRIGCOST' in self.conf.dataDictionary
or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
178 expectedOutputFileName =
'*_HLTMPPy_*.data'
180 matchedOutputFileNames = self._findOutputFiles(expectedOutputFileName)
182 if len(matchedOutputFileNames) > 0:
183 msg.error(f
'Directoy already contains files with expected output name format ({expectedOutputFileName}), please remove/rename these first: {matchedOutputFileNames}')
184 raise trfExceptions.TransformExecutionException(trfExit.nameToCode(
'TRF_OUTPUT_FILE_ERROR'),
185 f
'Directory already contains files with expected output name format {expectedOutputFileName}, please remove/rename these first: {matchedOutputFileNames}')
188 if OSSetupString
is not None and asetupString
is None:
189 raise trfExceptions.TransformExecutionException(trfExit.nameToCode(
'TRF_EXEC_SETUP_FAIL'),
190 'Athena version must be specified for the substep which requires running inside a container (either via --asetup or from DB)')
193 super(athenaExecutor, self).preExecute(input, output)
198 msg.info(
'Now writing wrapper for substep executor {0}'.
format(self._name))
199 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
200 msg.info(
'Athena will be executed in a subshell via {0}'.
format(self._cmd))