129{
131
132
134 std::ostringstream randStream;
136 ATH_MSG_INFO(
"Using random components for IPC object names: " << randStream.str());
137
138 ServiceHandle<StoreGateSvc> pDetStore(
"DetectorStore",
name());
140
141
144 AthenaInterprocess::SharedQueue* evtQueue = new AthenaInterprocess::SharedQueue("AthenaMPEventQueue_"+randStream.str(),2000,sizeof(long));
145 if(pDetStore->record(evtQueue,"AthenaMPEventQueue_"+randStream.str()).isFailure()) {
146 ATH_MSG_FATAL(
"Unable to record the pointer to the Shared Event queue into Detector Store");
147 delete evtQueue;
148 return StatusCode::FAILURE;
149 }
150 }
151
152
153
155 AthenaInterprocess::SharedQueue* failedPidQueue =
new AthenaInterprocess::SharedQueue(
"AthenaMPFailedPidQueue_"+randStream.str(),100,
sizeof(
pid_t));
156 if(pDetStore->record(failedPidQueue,"AthenaMPFailedPidQueue_"+randStream.str()).isFailure()) {
157 ATH_MSG_FATAL(
"Unable to record the pointer to the Failed PID queue into Detector Store");
158 delete failedPidQueue;
159 return StatusCode::FAILURE;
160 }
161 }
162
163
165 switch(errno) {
166 case EEXIST:
167 {
168
169
170
171 srand((
unsigned)
time(0));
172 std::ostringstream randname;
177
179 ATH_MSG_WARNING(
"The job will attempt to save it with the name " << backupDir <<
" and create new top directory from scratch");
180
183 strerror_r(errno, buf, sizeof(buf));
185 return StatusCode::FAILURE;
186 }
187
188 if(
mkdir(
m_workerTopDir.value().c_str(),S_IRWXU|S_IRGRP|S_IXGRP|S_IROTH|S_IXOTH)==0)
break;
189 }
190
191 default:
192 {
194 strerror_r(errno, buf, sizeof(buf));
196 return StatusCode::FAILURE;
197 }
198 }
199 }
200
201
202 ServiceHandle<IIncidentSvc> incSvc(
"IncidentSvc",
name());
204
206 incSvc->fireIncident(Incident(
name(),
"BeforeFork"));
207 }
208
209
210 std::shared_ptr<AthenaInterprocess::FdsRegistry> registry =
extractFds();
211
212 ToolHandleArray<IAthenaMPTool>::iterator
it =
m_tools.begin(),
214
215
216
217
218
219 auto sharedWriterTool =
m_tools[
"SharedWriterTool"];
221
222 if(sharedWriterWithFAFE) {
223 m_dataShare = SmartIF<IDataShare>(serviceLocator()->service(
"AthenaPoolSharedIOCnvSvc"));
225
226 (*sharedWriterTool)->useFdsRegistry(registry);
227 (*sharedWriterTool)->setRandString(randStream.str());
228
230 if(nChildren==-1) {
231 ATH_MSG_FATAL(
"makePool failed for " << (*sharedWriterTool)->name());
232 return StatusCode::FAILURE;
233 }
234 else {
236 }
237
238
239 StatusCode mySc = (*sharedWriterTool)->exec();
240 if(!mySc.isSuccess()) {
242 return StatusCode::FAILURE;
243 }
244
245
247 ATH_MSG_FATAL(
"Cannot make mother process a client for Conversion Service");
248 return StatusCode::FAILURE;
249 }
250 }
251
252
253
255
257 ? maxevt
260 if(!scEvtProc.isSuccess()) {
261 if(nEventsToProcess) {
262 ATH_MSG_FATAL("Unable to process first " << nEventsToProcess << " events in the master");
263 }
264 else {
265 ATH_MSG_FATAL("Unable to process first event in the master");
266 }
267 return scEvtProc;
268 }
269 }
270
271
272 ServiceHandle<IIoComponentMgr> ioMgr(
"IoComponentMgr",
name());
276
277
278 fflush(NULL);
279
280
281 if(sharedWriterWithFAFE && !
m_dataShare->makeClient(0).isSuccess()) {
282 ATH_MSG_FATAL(
"Cannot make mother process not client for Conversion Service");
283 return StatusCode::FAILURE;
284 }
285
287
288
290
291
293 for(;
it!=itLast; ++
it) {
294 if(sharedWriterWithFAFE && (*it)->name() == "AthMpEvtLoopMgr.SharedWriterTool") continue;
295 (*it)->useFdsRegistry(registry);
296 (*it)->setRandString(randStream.str());
297 (*it)->setMaxEvt(maxevt);
298 (*it)->setMPRunStop(this);
300 incSvc->fireIncident(Incident(
name(),
"PreFork"));
301 }
303 if(nChildren==-1) {
305 return StatusCode::FAILURE;
306 }
307 else {
309 }
310 }
311
314 return StatusCode::FAILURE;
315 }
316
317
318 for(it=
m_tools.begin(); it!=itLast; ++it) {
319 if(sharedWriterWithFAFE && (*it)->name() == "AthMpEvtLoopMgr.SharedWriterTool") continue;
320 if((*it)->exec().isFailure()) {
321 ATH_MSG_FATAL(
"Unable to submit work to the tool " << (*it)->name());
322 return StatusCode::FAILURE;
323 }
324 }
325
327
335 }
336
339 std::vector<std::string> logs;
340 for(it=
m_tools.begin(); it!=itLast; ++it) {
341 (*it)->subProcessLogs(logs);
342 for(
size_t i=0;
i<logs.size();++
i) {
343 std::cout <<
"\n File: " << logs[
i] <<
"\n" << std::endl;
345 log.open(logs[i].c_str(),std::ifstream::in);
348 std::getline(log,line);
349 std::cout <<
line << std::endl;
350 }
352 }
353 }
355 }
356
359 else
361}
#define ATH_CHECK
Evaluate an expression and check for errors.
#define ATH_MSG_WARNING(x)
std::vector< unsigned long > m_samplesSize
SmartIF< IDataShare > m_dataShare
ToolHandleArray< IAthenaMPTool > m_tools
Gaudi::Property< int > m_nWorkers
Gaudi::Property< std::string > m_workerTopDir
Gaudi::Property< bool > m_collectSubprocessLogs
std::shared_ptr< AthenaInterprocess::FdsRegistry > extractFds()
Gaudi::Property< int > m_nEventsBeforeFork
Gaudi::Property< bool > m_isPileup
std::vector< unsigned long > m_samplesRss
Gaudi::Property< std::string > m_strategy
StatusCode generateOutputReport()
std::vector< unsigned long > m_samplesSwap
std::vector< unsigned long > m_samplesPss
Gaudi::Property< int > m_nMemSamplingInterval
time(flags, cells_name, *args, **kw)
::StatusCode StatusCode
StatusCode definition for legacy code.
mkdir(path, recursive=True)