36 def preExecute(self, input = set(), output =
set()):
37 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
38
39
40 if (self._inputEventTest and 'skipEvents' in self.conf.argdict and
41 self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
42 msg.debug('Will test for events to process')
43 for dataType in input:
44 inputEvents = self.conf.dataDictionary[dataType].nentries
45 msg.debug('Got {} events for {}'.format(inputEvents, dataType))
46 if not isinstance(inputEvents, int):
47 msg.warning('Are input events countable? Got nevents={} so disabling event count check for this input'.format(inputEvents))
48 elif self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) >= inputEvents:
49 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
50 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor), inputEvents, dataType))
51
52
53 if self._skeleton is not None:
54 inputFiles = dict()
55 for dataType in input:
56 inputFiles[dataType] = self.conf.dataDictionary[dataType]
57 outputFiles = dict()
58 for dataType in output:
59 outputFiles[dataType] = self.conf.dataDictionary[dataType]
60
61
62 for dataType, dataArg in self.conf.dataDictionary.items():
63 if dataArg.io == 'input' and self._name in dataArg.executor:
64 inputFiles[dataArg.subtype] = dataArg
65
66 msg.info('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
67
68
69 self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
70 output = outputFiles)
71
72
74 if len(input) > 0:
75 self._extraMetadata['inputs'] = list(input)
76 if len(output) > 0:
77 self._extraMetadata['outputs'] = list(output)
78
79
80 asetupString = None
81 if 'asetup' in self.conf.argdict:
82 asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
83 else:
84 msg.info('Asetup report: {0}'.format(asetupReport()))
85
86
87 OSSetupString = None
88 if asetupString is not None:
89 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
90 currentOS = os.environ['ALRB_USER_PLATFORM']
91 if legacyOSRelease and "centos7" not in currentOS:
92 OSSetupString = "centos7"
93 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
94
95
96 if 'runInContainer' in self.conf.argdict:
97 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
98 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
99
100
101 dbrelease = dbsetup = None
102 if 'DBRelease' in self.conf.argdict:
103 dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
104 if dbrelease:
105
106 dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', os.path.basename(dbrelease))
107 if dbdMatch:
108 msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
109 if not os.access(dbrelease, os.R_OK):
110 msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
111 msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
112 dbrelease = dbdMatch.group(1)
113 dbsetup = cvmfsDBReleaseCheck(dbrelease)
114 else:
115
116 unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
117 if unpacked:
118
119 setupDBRelease(dbsetup)
120
121 else:
122 dbsetup = cvmfsDBReleaseCheck(dbrelease)
123
124
125 self._envUpdate = trfEnv.environmentUpdate()
126
127
128 self._prepAthenaCommandLine()
129
130
131 if 'athenaHLT' in self._exe:
132 self._cmd.remove('runargs.BSRDOtoRAW.py')
133
134 optionList = getTranslated(self.conf.argdict, name=self._name, substep=self._substep, first=self.conf.firstExecutor, output = outputFiles)
135 self._cmd.extend(optionList)
136
137 if self._isCAEnabled():
138 msg.info("Running in CA mode")
139
140 self._cmd.append(self._skeletonCA)
141 if 'preExec' in self.conf.argdict:
142 self._cmd.extend(self.conf.argdict['preExec'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor))
143 msg.info('Command adjusted for CA to %s', self._cmd)
144 else:
145 msg.info("Running in legacy mode")
146
147
148 if "outputHIST_DEBUGSTREAMMONFile" in self.conf.argdict:
149
150 dbgAsetupString, dbAlias = dbgStream.dbgPreRun(self.conf.dataDictionary['BS_RDO'], self.conf.dataDictionary['HIST_DEBUGSTREAMMON'].value, self.conf.argdict)
151
152
153 if asetupString is None and dbgAsetupString is not None:
154 asetupString = dbgAsetupString
155 msg.info('Will use asetup string for debug stream analysis %s', dbgAsetupString)
156
157 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
158 currentOS = os.environ['ALRB_USER_PLATFORM']
159 if legacyOSRelease and "centos7" not in currentOS:
160 OSSetupString = "centos7"
161 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
162
163 if 'runInContainer' in self.conf.argdict:
164 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
165 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
166
167
168 if 'useDB' in self.conf.argdict and 'DBserver' not in self.conf.argdict and dbAlias:
169 msg.warn("Database alias will be set to %s", dbAlias)
170 self._cmd.append("--db-server " + dbAlias)
171 else:
172 msg.info("Flag outputHIST_DEBUGSTREAMMONFile not defined - debug stream analysis will not run.")
173
174
175
176 if 'BS' in self.conf.dataDictionary or 'DRAW_TRIGCOST' in self.conf.dataDictionary or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
177
178 expectedOutputFileName = '*_HLTMPPy_*.data'
179
180 matchedOutputFileNames = self._findOutputFiles(expectedOutputFileName)
181
182 if len(matchedOutputFileNames) > 0:
183 msg.error(f'Directoy already contains files with expected output name format ({expectedOutputFileName}), please remove/rename these first: {matchedOutputFileNames}')
184 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
185 f'Directory already contains files with expected output name format {expectedOutputFileName}, please remove/rename these first: {matchedOutputFileNames}')
186
187
188 if OSSetupString is not None and asetupString is None:
189 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
190 'Athena version must be specified for the substep which requires running inside a container (either via --asetup or from DB)')
191
192
193 super(athenaExecutor, self).preExecute(input, output)
194
195
196
197
198 msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
199 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
200 msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
201