37 def preExecute(self, input = set(), output =
set()):
38 msg.debug(
'Preparing for execution of {0} with inputs {1} and outputs {2}'.
format(self.name, input, output))
41 if (self._inputEventTest
and 'skipEvents' in self.conf.argdict
and
42 self.conf.argdict[
'skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
is not None):
43 msg.debug(
'Will test for events to process')
44 for dataType
in input:
45 inputEvents = self.conf.dataDictionary[dataType].nentries
46 msg.debug(
'Got {0} events for {1}'.
format(inputEvents, dataType))
47 if not isinstance(inputEvents, six.integer_types):
48 msg.warning(
'Are input events countable? Got nevents={0} so disabling event count check for this input'.
format(inputEvents))
49 elif self.conf.argdict[
'skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) >= inputEvents:
50 raise trfExceptions.TransformExecutionException(trfExit.nameToCode(
'TRF_NOEVENTS'),
51 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.
format(self.conf.argdict[
'skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor), inputEvents, dataType))
54 if self._skeleton
is not None:
56 for dataType
in input:
57 inputFiles[dataType] = self.conf.dataDictionary[dataType]
59 for dataType
in output:
60 outputFiles[dataType] = self.conf.dataDictionary[dataType]
63 for dataType, dataArg
in self.conf.dataDictionary.items():
64 if dataArg.io ==
'input' and self._name
in dataArg.executor:
65 inputFiles[dataArg.subtype] = dataArg
67 msg.info(
'Input Files: {0}; Output Files: {1}'.
format(inputFiles, outputFiles))
70 self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
76 self._extraMetadata[
'inputs'] =
list(input)
78 self._extraMetadata[
'outputs'] =
list(output)
82 if 'asetup' in self.conf.argdict:
83 asetupString = self.conf.argdict[
'asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
89 if asetupString
is not None:
91 currentOS = os.environ[
'ALRB_USER_PLATFORM']
92 if legacyOSRelease
and "centos7" not in currentOS:
93 OSSetupString =
"centos7"
94 msg.info(
'Legacy release required for the substep {}, will setup a container running {}'.
format(self._substep, OSSetupString))
97 if 'runInContainer' in self.conf.argdict:
98 OSSetupString = self.conf.argdict[
'runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
99 msg.info(
'The step {} will be performed in a container running {}, as explicitly requested'.
format(self._substep, OSSetupString))
102 dbrelease = dbsetup =
None
103 if 'DBRelease' in self.conf.argdict:
104 dbrelease = self.conf.argdict[
'DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
107 dbdMatch = re.match(
r'DBRelease-([\d\.]+)\.tar\.gz', os.path.basename(dbrelease))
109 msg.debug(
'DBRelease setting {0} matches classic tarball file'.
format(dbrelease))
110 if not os.access(dbrelease, os.R_OK):
111 msg.warning(
'Transform was given tarball DBRelease file {0}, but this is not there'.
format(dbrelease))
112 msg.warning(
'I will now try to find DBRelease {0} in cvmfs'.
format(dbdMatch.group(1)))
113 dbrelease = dbdMatch.group(1)
117 unpacked, dbsetup =
unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
126 self._envUpdate = trfEnv.environmentUpdate()
129 self._prepAthenaCommandLine()
132 if 'athenaHLT' in self._exe:
133 self._cmd.
remove(
'runargs.BSRDOtoRAW.py')
135 optionList =
getTranslated(self.conf.argdict, name=self._name, substep=self._substep, first=self.conf.firstExecutor, output = outputFiles)
136 self._cmd.
extend(optionList)
138 if self._isCAEnabled():
139 msg.info(
"Running in CA mode")
141 self._cmd.
append(self._skeletonCA)
142 if 'preExec' in self.conf.argdict:
143 self._cmd.
extend(self.conf.argdict[
'preExec'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor))
144 msg.info(
'Command adjusted for CA to %s', self._cmd)
146 msg.info(
"Running in legacy mode")
149 if "outputHIST_DEBUGSTREAMMONFile" in self.conf.argdict:
151 dbgAsetupString, dbAlias = dbgStream.dbgPreRun(self.conf.dataDictionary[
'BS_RDO'], self.conf.dataDictionary[
'HIST_DEBUGSTREAMMON'].value, self.conf.argdict)
154 if asetupString
is None and dbgAsetupString
is not None:
155 asetupString = dbgAsetupString
156 msg.info(
'Will use asetup string for debug stream analysis %s', dbgAsetupString)
159 currentOS = os.environ[
'ALRB_USER_PLATFORM']
160 if legacyOSRelease
and "centos7" not in currentOS:
161 OSSetupString =
"centos7"
162 msg.info(
'Legacy release required for the substep {}, will setup a container running {}'.
format(self._substep, OSSetupString))
164 if 'runInContainer' in self.conf.argdict:
165 OSSetupString = self.conf.argdict[
'runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
166 msg.info(
'The step {} will be performed in a container running {}, as explicitly requested'.
format(self._substep, OSSetupString))
169 if 'useDB' in self.conf.argdict
and 'DBserver' not in self.conf.argdict
and dbAlias:
170 msg.warn(
"Database alias will be set to %s", dbAlias)
171 self._cmd.
append(
"--db-server " + dbAlias)
173 msg.info(
"Flag outputHIST_DEBUGSTREAMMONFile not defined - debug stream analysis will not run.")
177 if 'BS' in self.conf.dataDictionary
or 'DRAW_TRIGCOST' in self.conf.dataDictionary
or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
179 expectedOutputFileName =
'*_HLTMPPy_*.data'
181 matchedOutputFileNames = self._findOutputFiles(expectedOutputFileName)
183 if len(matchedOutputFileNames) > 0:
184 msg.error(f
'Directoy already contains files with expected output name format ({expectedOutputFileName}), please remove/rename these first: {matchedOutputFileNames}')
185 raise trfExceptions.TransformExecutionException(trfExit.nameToCode(
'TRF_OUTPUT_FILE_ERROR'),
186 f
'Directory already contains files with expected output name format {expectedOutputFileName}, please remove/rename these first: {matchedOutputFileNames}')
189 if OSSetupString
is not None and asetupString
is None:
190 raise trfExceptions.TransformExecutionException(trfExit.nameToCode(
'TRF_EXEC_SETUP_FAIL'),
191 'Athena version must be specified for the substep which requires running inside a container (either via --asetup or from DB)')
194 super(athenaExecutor, self).preExecute(input, output)
199 msg.info(
'Now writing wrapper for substep executor {0}'.
format(self._name))
200 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
201 msg.info(
'Athena will be executed in a subshell via {0}'.
format(self._cmd))