39 def preExecute(self, input = set(), output =
set()):
40 msg.debug('Preparing for execution of {0} with inputs {1} and outputs {2}'.format(self.name, input, output))
41
42
43 self._exe = 'setsid ' + self.conf.argdict['trigExe'].value
44
45
46 if (self._inputEventTest and 'skipEvents' in self.conf.argdict and
47 self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) is not None):
48 msg.debug('Will test for events to process')
49 for dataType in input:
50 inputEvents = self.conf.dataDictionary[dataType].nentries
51 msg.debug('Got {} events for {}'.format(inputEvents, dataType))
52 if not isinstance(inputEvents, int):
53 msg.warning('Are input events countable? Got nevents={} so disabling event count check for this input'.format(inputEvents))
54 elif self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor) >= inputEvents:
55 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_NOEVENTS'),
56 'No events to process: {0} (skipEvents) >= {1} (inputEvents of {2}'.format(self.conf.argdict['skipEvents'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor), inputEvents, dataType))
57
58
59 if self._skeleton is not None:
60 inputFiles = dict()
61 for dataType in input:
62 inputFiles[dataType] = self.conf.dataDictionary[dataType]
63 outputFiles = dict()
64 for dataType in output:
65 outputFiles[dataType] = self.conf.dataDictionary[dataType]
66
67
68 for dataType, dataArg in self.conf.dataDictionary.items():
69 if dataArg.io == 'input' and self._name in dataArg.executor:
70 inputFiles[dataArg.subtype] = dataArg
71
72 msg.info('Input Files: {0}; Output Files: {1}'.format(inputFiles, outputFiles))
73
74
75 self._topOptionsFiles = self._jobOptionsTemplate.getTopOptions(input = inputFiles,
76 output = outputFiles)
77
78
80 if len(input) > 0:
81 self._extraMetadata['inputs'] = list(input)
82 if len(output) > 0:
83 self._extraMetadata['outputs'] = list(output)
84
85
86 asetupString = None
87 if 'asetup' in self.conf.argdict:
88 asetupString = self.conf.argdict['asetup'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
89 else:
90 msg.info('Asetup report: {0}'.format(asetupReport()))
91
92
93 OSSetupString = None
94 if asetupString is not None:
95 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
96 currentOS = os.environ['ALRB_USER_PLATFORM']
97 if legacyOSRelease and "centos7" not in currentOS:
98 OSSetupString = "centos7"
99 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
100
101
102 if 'runInContainer' in self.conf.argdict:
103 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
104 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
105
106
107 dbrelease = dbsetup = None
108 if 'DBRelease' in self.conf.argdict:
109 dbrelease = self.conf.argdict['DBRelease'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
110 if dbrelease:
111
112 dbdMatch = re.match(r'DBRelease-([\d\.]+)\.tar\.gz', os.path.basename(dbrelease))
113 if dbdMatch:
114 msg.debug('DBRelease setting {0} matches classic tarball file'.format(dbrelease))
115 if not os.access(dbrelease, os.R_OK):
116 msg.warning('Transform was given tarball DBRelease file {0}, but this is not there'.format(dbrelease))
117 msg.warning('I will now try to find DBRelease {0} in cvmfs'.format(dbdMatch.group(1)))
118 dbrelease = dbdMatch.group(1)
119 dbsetup = cvmfsDBReleaseCheck(dbrelease)
120 else:
121
122 unpacked, dbsetup = unpackDBRelease(tarball=dbrelease, dbversion=dbdMatch.group(1))
123 if unpacked:
124
125 setupDBRelease(dbsetup)
126
127 else:
128 dbsetup = cvmfsDBReleaseCheck(dbrelease)
129
130
131 self._envUpdate = trfEnv.environmentUpdate()
132
133
134 self._prepAthenaCommandLine()
135
136
137 if 'athenaHLT' in self._exe or 'athenaEF' in self._exe:
138 self._cmd.remove('runargs.BSRDOtoRAW.py')
139
140 optionList = getTranslated(self.conf.argdict, name=self._name, substep=self._substep, first=self.conf.firstExecutor, output = outputFiles)
141 self._cmd.extend(optionList)
142
143 if self._isCAEnabled():
144 msg.info("Running in CA mode")
145
146 self._cmd.append(self._skeletonCA)
147 if 'preExec' in self.conf.argdict:
148 self._cmd.extend(self.conf.argdict['preExec'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor))
149 msg.info('Command adjusted for CA to %s', self._cmd)
150 else:
151 msg.info("Running in legacy mode")
152
153
154 if "outputHIST_DEBUGSTREAMMONFile" in self.conf.argdict:
155
156 dbgAsetupString, dbAlias = dbgStream.dbgPreRun(self.conf.dataDictionary['BS_RDO'], self.conf.dataDictionary['HIST_DEBUGSTREAMMON'].value, self.conf.argdict)
157
158
159 if asetupString is None and dbgAsetupString is not None:
160 asetupString = dbgAsetupString
161 msg.info('Will use asetup string for debug stream analysis %s', dbgAsetupString)
162
163 legacyOSRelease = asetupReleaseIsOlderThan(asetupString, 24)
164 currentOS = os.environ['ALRB_USER_PLATFORM']
165 if legacyOSRelease and "centos7" not in currentOS:
166 OSSetupString = "centos7"
167 msg.info('Legacy release required for the substep {}, will setup a container running {}'.format(self._substep, OSSetupString))
168
169 if 'runInContainer' in self.conf.argdict:
170 OSSetupString = self.conf.argdict['runInContainer'].returnMyValue(name=self._name, substep=self._substep, first=self.conf.firstExecutor)
171 msg.info('The step {} will be performed in a container running {}, as explicitly requested'.format(self._substep, OSSetupString))
172
173
174 if 'useDB' in self.conf.argdict and 'DBserver' not in self.conf.argdict and dbAlias:
175 msg.warn("Database alias will be set to %s", dbAlias)
176 self._cmd.append("--db-server " + dbAlias)
177 else:
178 msg.info("Flag outputHIST_DEBUGSTREAMMONFile not defined - debug stream analysis will not run.")
179
180
181
182 if 'BS' in self.conf.dataDictionary or 'DRAW_TRIGCOST' in self.conf.dataDictionary or 'HIST_DEBUGSTREAMMON' in self.conf.dataDictionary:
183
184 matchedOutputFileNames = self._findOutputFiles(self.expectedOutputFileName)
185
186 if len(matchedOutputFileNames) > 0:
187 msg.error(f'Directoy already contains files with expected output name format ({self.expectedOutputFileName}), please remove/rename these first: {matchedOutputFileNames}')
188 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_OUTPUT_FILE_ERROR'),
189 f'Directory already contains files with expected output name format {self.expectedOutputFileName}, please remove/rename these first: {matchedOutputFileNames}')
190
191
192 if OSSetupString is not None and asetupString is None:
193 raise trfExceptions.TransformExecutionException(trfExit.nameToCode('TRF_EXEC_SETUP_FAIL'),
194 'Athena version must be specified for the substep which requires running inside a container (either via --asetup or from DB)')
195
196
197 super(athenaExecutor, self).preExecute(input, output)
198
199
200
201
202 msg.info('Now writing wrapper for substep executor {0}'.format(self._name))
203 self._writeAthenaWrapper(asetup=asetupString, dbsetup=dbsetup, ossetup=OSSetupString)
204 msg.info('Athena will be executed in a subshell via {0}'.format(self._cmd))
205